This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f5b729d0687 Pipe: incorporate batch data into
TsFileInsertionEventScanParser memory control & Subscription: close data
container in tsfile event & bind memory block for tablet response & generate
empty tablet as initial response & offer subsequent tablet response before ack
& expose prefetch and backdoor configs & best-effort disorder control (#14752)
(#14849)
f5b729d0687 is described below
commit f5b729d06875187bdf0eab03e90d6d086afa689f
Author: VGalaxies <[email protected]>
AuthorDate: Fri Feb 14 17:50:42 2025 +0800
Pipe: incorporate batch data into TsFileInsertionEventScanParser memory
control & Subscription: close data container in tsfile event & bind memory
block for tablet response & generate empty tablet as initial response & offer
subsequent tablet response before ack & expose prefetch and backdoor configs &
best-effort disorder control (#14752) (#14849)
---
.../iotdb/rpc/subscription/config/TopicConfig.java | 24 +++
.../subscription/payload/poll/TabletsPayload.java | 11 +-
.../scan/TsFileInsertionScanDataContainer.java | 19 ++
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 48 +++++
.../broker/SubscriptionPrefetchingQueue.java | 47 +++--
.../broker/SubscriptionPrefetchingQueueStates.java | 96 +++++++---
.../broker/SubscriptionPrefetchingTabletQueue.java | 2 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 2 +-
.../event/SubscriptionCommitContextSupplier.java | 28 ---
.../db/subscription/event/SubscriptionEvent.java | 80 +++++---
.../event/batch/SubscriptionPipeEventBatch.java | 30 ---
.../batch/SubscriptionPipeTabletEventBatch.java | 66 +++----
.../batch/SubscriptionPipeTsFileEventBatch.java | 15 --
.../cache/CachedSubscriptionPollResponse.java | 5 +
.../event/pipe/SubscriptionPipeEmptyEvent.java | 4 +-
.../pipe/SubscriptionPipeTabletBatchEvents.java | 46 ++++-
.../pipe/SubscriptionPipeTsFileBatchEvents.java | 8 +-
.../pipe/SubscriptionPipeTsFilePlainEvent.java | 8 +-
.../SubscriptionEventExtendableResponse.java | 1 +
.../event/response/SubscriptionEventResponse.java | 10 +-
.../response/SubscriptionEventSingleResponse.java | 1 +
.../response/SubscriptionEventTabletResponse.java | 201 ++++++++++++++++-----
.../response/SubscriptionEventTsFileResponse.java | 32 +++-
.../apache/iotdb/commons/conf/CommonConfig.java | 76 ++++++--
.../iotdb/commons/conf/CommonDescriptor.java | 37 +++-
.../subscription/config/SubscriptionConfig.java | 47 ++++-
.../commons/subscription/meta/topic/TopicMeta.java | 4 +
27 files changed, 671 insertions(+), 277 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 46dc7601e0f..55c84e94037 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -124,6 +124,19 @@ public class TopicConfig extends PipeParameters {
.collect(Collectors.toMap(key -> key, key -> looseRangeValue));
}
+ public Map<String, String> getAttributesWithSourcePrefix() {
+ final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
+ attributes.forEach(
+ (key, value) -> {
+ if (key.toLowerCase().startsWith("source")) {
+ attributesWithProcessorPrefix.put(key, value);
+ }
+ });
+ return attributesWithProcessorPrefix;
+ }
+
+ /////////////////////////////// processor attributes mapping
///////////////////////////////
+
public Map<String, String> getAttributesWithProcessorPrefix() {
final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
attributes.forEach(
@@ -138,4 +151,15 @@ public class TopicConfig extends PipeParameters {
public Map<String, String> getAttributesWithSinkFormat() {
return Collections.emptyMap(); // default to hybrid
}
+
+ public Map<String, String> getAttributesWithSinkPrefix() {
+ final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
+ attributes.forEach(
+ (key, value) -> {
+ if (key.toLowerCase().startsWith("sink")) {
+ attributesWithProcessorPrefix.put(key, value);
+ }
+ });
+ return attributesWithProcessorPrefix;
+ }
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
index 44f8d642327..23b9e066329 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
@@ -35,9 +35,14 @@ public class TabletsPayload implements
SubscriptionPollPayload {
private transient List<Tablet> tablets = new ArrayList<>();
/**
- * The field to be filled in the next {@link PollTabletsPayload} request. If
negative, it
- * indicates all tablets have been fetched, and -nextOffset represents the
total number of
- * tablets.
+ * The field to be filled in the next {@link PollTabletsPayload} request.
+ *
+ * <ul>
+ * <li>If nextOffset is 1, it indicates that the current payload is the
first payload (its
+ * tablets are empty) and the fetching should continue.
+ * <li>If nextOffset is negative, it indicates all tablets have been
fetched, and -nextOffset
+ * represents the total number of tablets.
+ * </ul>
*/
private transient int nextOffset;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index e73cca1cd9a..318e70b5605 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -71,6 +72,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private IChunkReader chunkReader;
private BatchData data;
+ private final PipeMemoryBlock allocatedMemoryBlockForBatchData;
private boolean currentIsMultiPage;
private String currentDevice;
@@ -101,6 +103,10 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
this.endTime = endTime;
filter = Objects.nonNull(timeFilterExpression) ?
timeFilterExpression.getFilter() : null;
+ // Allocate empty memory block, will be resized later.
+ this.allocatedMemoryBlockForBatchData =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
try {
tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
tsFileSequenceReader.position((long)
TSFileConfig.MAGIC_STRING.getBytes().length + 1);
@@ -264,6 +270,10 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
do {
data = chunkReader.nextPageData();
+ PipeDataNodeResourceManager.memory()
+ .forceResize(
+ allocatedMemoryBlockForBatchData,
+ PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data));
} while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
} while (!data.hasCurrent());
}
@@ -497,4 +507,13 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
return false;
}
+
+ @Override
+ public void close() {
+ super.close();
+
+ if (allocatedMemoryBlockForBatchData != null) {
+ allocatedMemoryBlockForBatchData.close();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index cfe012fb40a..78baf1d80ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -246,4 +246,52 @@ public class PipeMemoryWeightUtil {
return totalSizeInBytes;
}
+
+ public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
+ int totalSizeInBytes = 0;
+
+ // timestamp
+ totalSizeInBytes += 8;
+
+ // values
+ final TSDataType type = batchData.getDataType();
+ if (type != null) {
+ if (type == TSDataType.VECTOR && batchData.getVector() != null) {
+ for (int i = 0; i < batchData.getVector().length; ++i) {
+ final TsPrimitiveType primitiveType = batchData.getVector()[i];
+ if (primitiveType == null || primitiveType.getDataType() == null) {
+ continue;
+ }
+ // consider variable references (plus 8) and memory alignment (round
up to 8)
+ totalSizeInBytes += roundUpToMultiple(primitiveType.getSize() + 8,
8);
+ }
+ } else {
+ if (type.isBinary()) {
+ final Binary binary = batchData.getBinary();
+ // refer to org.apache.tsfile.utils.TsPrimitiveType.TsBinary.getSize
+ totalSizeInBytes +=
+ roundUpToMultiple((binary == null ? 8 : binary.getLength() + 8)
+ 8, 8);
+ } else {
+ totalSizeInBytes +=
roundUpToMultiple(TsPrimitiveType.getByType(type).getSize() + 8, 8);
+ }
+ }
+ }
+
+ return batchData.length() * totalSizeInBytes;
+ }
+
+ /**
+ * Rounds up the given integer num to the nearest multiple of n.
+ *
+ * @param num The integer to be rounded up.
+ * @param n The specified multiple.
+ * @return The nearest multiple of n greater than or equal to num.
+ */
+ private static int roundUpToMultiple(int num, int n) {
+ if (n == 0) {
+ throw new IllegalArgumentException("The multiple n must be greater than
0");
+ }
+ // Calculate the rounded up value to the nearest multiple of n
+ return ((num + n - 1) / n) * n;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 5afbf212829..cf78aff6d4a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -47,7 +47,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,7 +68,7 @@ public abstract class SubscriptionPrefetchingQueue {
private final AtomicLong commitIdGenerator;
/** A queue containing a series of prefetched pollable {@link
SubscriptionEvent}. */
- protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue;
+ protected final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;
/**
* A map that tracks in-flight {@link SubscriptionEvent}, keyed by consumer
id and commit context.
@@ -112,7 +112,7 @@ public abstract class SubscriptionPrefetchingQueue {
this.inputPendingQueue = inputPendingQueue;
this.commitIdGenerator = commitIdGenerator;
- this.prefetchingQueue = new LinkedBlockingQueue<>();
+ this.prefetchingQueue = new PriorityBlockingQueue<>();
this.inFlightEvents = new ConcurrentHashMap<>();
this.batches = new SubscriptionPipeEventBatches(this, maxDelayInMs,
maxBatchSizeInBytes);
@@ -259,9 +259,11 @@ public abstract class SubscriptionPrefetchingQueue {
if (isClosed()) {
return false;
}
+ // TODO: more refined behavior (prefetch/serialize/...) control
if (states.shouldPrefetch()) {
tryPrefetch();
- remapInFlightEventsSnapshot(committedCleaner, pollableNacker,
responsePrefetcher);
+ remapInFlightEventsSnapshot(
+ committedCleaner, pollableNacker, responsePrefetcher,
responseSerializer);
return true;
} else {
remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
@@ -286,9 +288,18 @@ public abstract class SubscriptionPrefetchingQueue {
}
}
- protected void enqueueEventToPrefetchingQueue(final SubscriptionEvent event)
{
- event.trySerializeCurrentResponse();
- prefetchingQueue.add(event);
+ public void prefetchEvent(@NonNull final SubscriptionEvent thisEvent) {
+ final SubscriptionEvent thatEvent = prefetchingQueue.peek();
+ if (Objects.nonNull(thatEvent)) {
+ if (thisEvent.compareTo(thatEvent) < 0) {
+ // disorder causes:
+ // 1. prefetch nacked event
+ // 2. late cross-event of dataset payload
+ states.markDisorderCause();
+ }
+ }
+
+ prefetchingQueue.add(thisEvent);
}
/**
@@ -376,14 +387,14 @@ public abstract class SubscriptionPrefetchingQueue {
* @return {@code true} if there are subscription events prefetched.
*/
protected boolean onEvent(final TabletInsertionEvent event) {
- return batches.onEvent((EnrichedEvent) event,
this::enqueueEventToPrefetchingQueue);
+ return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
}
/**
* @return {@code true} if there are subscription events prefetched.
*/
protected boolean onEvent() {
- return batches.onEvent(this::enqueueEventToPrefetchingQueue);
+ return batches.onEvent(this::prefetchEvent);
}
/////////////////////////////// commit ///////////////////////////////
@@ -449,7 +460,7 @@ public abstract class SubscriptionPrefetchingQueue {
this);
}
- ev.ack(this::enqueueEventToPrefetchingQueue);
+ ev.ack();
ev.recordCommittedTimestamp(); // now committed
acked.set(true);
@@ -655,12 +666,12 @@ public abstract class SubscriptionPrefetchingQueue {
(ev) -> {
if (ev.eagerlyPollable()) {
ev.nack(); // now pollable (the nack operation here is actually
unnecessary)
- enqueueEventToPrefetchingQueue(ev);
+ prefetchEvent(ev);
// no need to log warn for eagerly pollable event
return null; // remove this entry
} else if (ev.pollable()) {
ev.nack(); // now pollable
- enqueueEventToPrefetchingQueue(ev);
+ prefetchEvent(ev);
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} recycle event {}
from in flight events, nack and enqueue it to prefetching queue",
this,
@@ -672,9 +683,19 @@ public abstract class SubscriptionPrefetchingQueue {
private final RemappingFunction<SubscriptionEvent> responsePrefetcher =
(ev) -> {
- // prefetch and serialize the remaining responses
+ // prefetch the remaining responses
try {
ev.prefetchRemainingResponses();
+ } catch (final Exception ignored) {
+ }
+ return ev;
+ };
+
+ private final RemappingFunction<SubscriptionEvent> responseSerializer =
+ (ev) -> {
+ // serialize the responses
+ try {
+ ev.trySerializeCurrentResponse();
ev.trySerializeRemainingResponses();
} catch (final Exception ignored) {
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
index a699672c7b3..8b0eb256f42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
@@ -19,15 +19,19 @@
package org.apache.iotdb.db.subscription.broker;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.metrics.core.utils.IoTDBMovingAverage;
import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
/**
* The {@link SubscriptionPrefetchingQueueStates} manages the state of a {@link
* SubscriptionPrefetchingQueue}. It determines whether prefetching should
occur based on memory
@@ -40,16 +44,21 @@ public class SubscriptionPrefetchingQueueStates {
private static final double EPSILON = 1e-6;
- // TODO: config
- private static final double PREFETCH_MEMORY_THRESHOLD = 0.6;
- private static final double MISSING_RATE_THRESHOLD = 0.9;
- private static final int PREFETCHED_EVENT_COUNT_CONTROL_PARAMETER = 100;
+ private static final double PREFETCH_MEMORY_THRESHOLD =
+
SubscriptionConfig.getInstance().getSubscriptionPrefetchMemoryThreshold();
+ private static final double PREFETCH_MISSING_RATE_THRESHOLD =
+
SubscriptionConfig.getInstance().getSubscriptionPrefetchMissingRateThreshold();
+ private static final int PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD =
+
SubscriptionConfig.getInstance().getSubscriptionPrefetchEventLocalCountThreshold();
+ private static final int PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD =
+
SubscriptionConfig.getInstance().getSubscriptionPrefetchEventGlobalCountThreshold();
private final SubscriptionPrefetchingQueue prefetchingQueue;
private volatile long lastPollRequestTimestamp;
private final Meter pollRequestMeter;
private final Meter missingPrefechMeter;
+ private final Counter disorderCauseCounter; // TODO: use meter
public SubscriptionPrefetchingQueueStates(final SubscriptionPrefetchingQueue
prefetchingQueue) {
this.prefetchingQueue = prefetchingQueue;
@@ -57,6 +66,7 @@ public class SubscriptionPrefetchingQueueStates {
this.lastPollRequestTimestamp = -1;
this.pollRequestMeter = new Meter(new IoTDBMovingAverage(),
Clock.defaultClock());
this.missingPrefechMeter = new Meter(new IoTDBMovingAverage(),
Clock.defaultClock());
+ this.disorderCauseCounter = new Counter();
}
public void markPollRequest() {
@@ -68,19 +78,55 @@ public class SubscriptionPrefetchingQueueStates {
missingPrefechMeter.mark();
}
+ public void markDisorderCause() {
+ disorderCauseCounter.inc();
+ }
+
+ private double pollRate() {
+ return pollRequestMeter.getOneMinuteRate();
+ }
+
+ private double missingRate() {
+ if (isApproximatelyZero(pollRate())) {
+ return 0.0;
+ }
+ return missingPrefechMeter.getOneMinuteRate() / pollRate();
+ }
+
public boolean shouldPrefetch() {
+ // 1. reject conditions
+ // 1.1. option
+ if (!SubscriptionConfig.getInstance().getSubscriptionPrefetchEnabled()) {
+ return false;
+ }
+
+ // 1.2. memory usage
if (!isMemoryEnough()) {
return false;
}
- if (missingRate() > MISSING_RATE_THRESHOLD) {
- return true;
+ // 1.3. local event count
+ if (hasTooManyPrefetchedLocalEvent()) {
+ return false;
}
- if (hasTooManyPrefetchedEvents()) {
+ // 1.4. global event count
+ if (hasTooManyPrefetchedGlobalEvent()) {
return false;
}
+ // 1.5. disorder history
+ if (hasDisorderCause()) {
+ return false;
+ }
+
+ // 2. permissive conditions
+ // 2.1. missing rate
+ if (isMissingRateTooHigh()) {
+ return true;
+ }
+
+ // 2.2. prefetch statistics
// the delta between the prefetch timestamp and the timestamp of the last
poll request > poll
// request frequency
return (System.currentTimeMillis() - lastPollRequestTimestamp) *
pollRate() > 1000;
@@ -92,23 +138,24 @@ public class SubscriptionPrefetchingQueueStates {
> PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes();
}
- private double pollRate() {
- return pollRequestMeter.getOneMinuteRate();
+ private boolean hasTooManyPrefetchedLocalEvent() {
+ return prefetchingQueue.getPrefetchedEventCount() >
PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
}
- private double missingRate() {
- if (isApproximatelyZero(pollRate())) {
- return 0.0;
- }
- return missingPrefechMeter.getOneMinuteRate() / pollRate();
- }
-
- private boolean hasTooManyPrefetchedEvents() {
+ private boolean hasTooManyPrefetchedGlobalEvent() {
// The number of prefetched events in the current prefetching queue >
floor(t / number of
// prefetching queues), where t is an adjustable parameter.
return prefetchingQueue.getPrefetchedEventCount()
* SubscriptionAgent.broker().getPrefetchingQueueCount()
- > PREFETCHED_EVENT_COUNT_CONTROL_PARAMETER;
+ > PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD;
+ }
+
+ private boolean isMissingRateTooHigh() {
+ return missingRate() > PREFETCH_MISSING_RATE_THRESHOLD;
+ }
+
+ private boolean hasDisorderCause() {
+ return disorderCauseCounter.getCount() > 0;
}
private static boolean isApproximatelyZero(final double value) {
@@ -117,12 +164,11 @@ public class SubscriptionPrefetchingQueueStates {
@Override
public String toString() {
- return "PollPrefetchStates{lastPollRequestTimestamp="
- + lastPollRequestTimestamp
- + ", pollRate="
- + pollRate()
- + ", missingRate="
- + missingRate()
- + "}";
+ return toStringHelper(this)
+ .add("lastPollRequestTimestamp", lastPollRequestTimestamp)
+ .add("pollRate", pollRate())
+ .add("missingRate", missingRate())
+ .add("disorderCause", disorderCauseCounter.getCount())
+ .toString();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index e729fc74095..d590df17c2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -173,7 +173,7 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
@Override
protected boolean onEvent(final TsFileInsertionEvent event) {
- return batches.onEvent((EnrichedEvent) event,
this::enqueueEventToPrefetchingQueue);
+ return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
}
/////////////////////////////// stringify ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 5a7b2f7f31c..adef86518b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -231,7 +231,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
new SubscriptionPipeTsFilePlainEvent((PipeTsFileInsertionEvent)
event),
((PipeTsFileInsertionEvent) event).getTsFile(),
commitContext);
- super.enqueueEventToPrefetchingQueue(ev);
+ super.prefetchEvent(ev);
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
deleted file mode 100644
index cc73384d42c..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.event;
-
-import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-
-@FunctionalInterface
-public interface SubscriptionCommitContextSupplier {
-
- SubscriptionCommitContext get();
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index 2db5bc7a4b0..acf47ca8a51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -41,13 +41,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
+import static com.google.common.base.MoreObjects.toStringHelper;
import static
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
-public class SubscriptionEvent {
+public class SubscriptionEvent implements Comparable<SubscriptionEvent> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionEvent.class);
@@ -63,7 +64,10 @@ public class SubscriptionEvent {
private final AtomicLong committedTimestamp = new
AtomicLong(INVALID_TIMESTAMP);
// record file name for file payload
- private String fileName;
+ private volatile String fileName;
+
+ // record for cross-event dataset payload
+ private volatile SubscriptionCommitContext rootCommitContext;
private static final long NACK_COUNT_REPORT_THRESHOLD = 3;
private final AtomicLong nackCount = new AtomicLong();
@@ -86,18 +90,41 @@ public class SubscriptionEvent {
this(response.getResponseType(), response.getPayload(),
response.getCommitContext());
}
+ /**
+ * Constructs a {@link SubscriptionEvent} with the response type of {@link
+ * SubscriptionEventTabletResponse}.
+ */
+ public SubscriptionEvent(
+ final SubscriptionPipeTabletEventBatch batch, final
SubscriptionPrefetchingQueue queue) {
+ final SubscriptionPipeTabletBatchEvents events = new
SubscriptionPipeTabletBatchEvents(batch);
+ final SubscriptionCommitContext commitContext =
queue.generateSubscriptionCommitContext();
+
+ this.pipeEvents = events;
+ this.response =
+ new SubscriptionEventTabletResponse(batch, queue, events,
commitContext, commitContext);
+ this.commitContext = commitContext;
+
+ // root event for cross-event dataset payload
+ this.rootCommitContext = commitContext;
+ }
+
/**
* Constructs a {@link SubscriptionEvent} with the response type of {@link
* SubscriptionEventTabletResponse}.
*/
public SubscriptionEvent(
final SubscriptionPipeTabletEventBatch batch,
- final SubscriptionCommitContextSupplier commitContextSupplier) {
- this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch);
- final SubscriptionCommitContext commitContext =
commitContextSupplier.get();
+ final SubscriptionPrefetchingQueue queue,
+ final SubscriptionCommitContext rootCommitContext) {
+ final SubscriptionPipeTabletBatchEvents events = new
SubscriptionPipeTabletBatchEvents(batch);
+ final SubscriptionCommitContext commitContext =
queue.generateSubscriptionCommitContext();
+
+ this.pipeEvents = events;
this.response =
- new SubscriptionEventTabletResponse(batch, commitContext,
commitContextSupplier);
+ new SubscriptionEventTabletResponse(batch, queue, events,
commitContext, rootCommitContext);
this.commitContext = commitContext;
+
+ this.rootCommitContext = rootCommitContext;
}
/**
@@ -145,12 +172,8 @@ public class SubscriptionEvent {
return response.isCommittable();
}
- public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
- // NOTE: we should ack pipe events before ack response since multiple
events may reuse the same
- // batch (as pipe events)
- // TODO: consider more elegant design for this method
+ public void ack() {
pipeEvents.ack();
- response.ack(onCommittedHook);
}
/**
@@ -233,7 +256,7 @@ public class SubscriptionEvent {
//////////////////////////// prefetch & fetch ////////////////////////////
- public void prefetchRemainingResponses() throws Exception {
+ public void prefetchRemainingResponses() {
response.prefetchRemainingResponses();
}
@@ -281,16 +304,25 @@ public class SubscriptionEvent {
@Override
public String toString() {
- return "SubscriptionEvent{response="
- + response
- + ", lastPolledConsumerId="
- + lastPolledConsumerId
- + ", lastPolledTimestamp="
- + lastPolledTimestamp
- + ", committedTimestamp="
- + committedTimestamp
- + ", pipeEvents="
- + pipeEvents
- + "}";
+ return toStringHelper(this)
+ .add("commitContext", commitContext)
+ .add("response", response)
+ .add("lastPolledConsumerId", lastPolledConsumerId)
+ .add("lastPolledTimestamp", lastPolledTimestamp)
+ .add("committedTimestamp", committedTimestamp)
+ .add("pipeEvents", pipeEvents)
+ .add(
+ "rootCommitContext",
+ Objects.nonNull(rootCommitContext) ? rootCommitContext :
"<unknown>")
+ .toString();
+ }
+
+ @Override
+ public int compareTo(final SubscriptionEvent that) {
+ final SubscriptionCommitContext thisCommitContext =
+ Objects.nonNull(this.rootCommitContext) ? this.rootCommitContext :
this.commitContext;
+ final SubscriptionCommitContext thatCommitContext =
+ Objects.nonNull(that.rootCommitContext) ? that.rootCommitContext :
that.commitContext;
+ return thisCommitContext.compareTo(thatCommitContext);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index 58aac3714c1..5f2b0e069a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -30,12 +30,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
public abstract class SubscriptionPipeEventBatch {
@@ -116,33 +113,6 @@ public abstract class SubscriptionPipeEventBatch {
protected abstract List<SubscriptionEvent> generateSubscriptionEvents()
throws Exception;
- /////////////////////////////// stringify ///////////////////////////////
-
- protected Map<String, String> coreReportMessage() {
- final Map<String, String> result = new HashMap<>();
- result.put("regionId", String.valueOf(regionId));
- result.put("prefetchingQueue",
prefetchingQueue.coreReportMessage().toString());
- result.put("maxDelayInMs", String.valueOf(maxDelayInMs));
- result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes));
- // omit subscription events here
- result.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4));
- return result;
- }
-
- private static String formatEnrichedEvents(
- final List<EnrichedEvent> enrichedEvents, final int threshold) {
- final List<String> eventMessageList =
- enrichedEvents.stream()
- .limit(threshold)
- .map(EnrichedEvent::coreReportMessage)
- .collect(Collectors.toList());
- if (eventMessageList.size() > threshold) {
- eventMessageList.add(
- String.format("omit the remaining %s event(s)...",
eventMessageList.size() - threshold));
- }
- return eventMessageList.toString();
- }
-
//////////////////////////// APIs provided for metric framework
////////////////////////////
public int getPipeEventCount() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index d1d56e6c5a7..00114d85df4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -41,8 +41,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@@ -61,7 +61,8 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
private final Meter insertNodeTabletInsertionEventSizeEstimator;
private final Meter rawTabletInsertionEventSizeEstimator;
- private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
+ private volatile List<EnrichedEvent> iteratedEnrichedEvents;
+ private final AtomicInteger referenceCount = new AtomicInteger();
private static final long ITERATED_COUNT_REPORT_FREQ =
30000; // based on the full parse of a 128MB tsfile estimate
@@ -78,35 +79,36 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
this.rawTabletInsertionEventSizeEstimator =
new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
+
+ resetForIteration();
}
/////////////////////////////// ack & clean ///////////////////////////////
@Override
public synchronized void ack() {
- // only decrease the reference count of iterated events
- for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
- enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
- }
- iteratedEnrichedEvents.clear();
+ referenceCount.decrementAndGet();
+ // do nothing for iterated enriched events, see
SubscriptionPipeTabletBatchEvents
}
@Override
public synchronized void cleanUp() {
- // do nothing if it has next
- if (hasNext()) {
+ // do nothing if it has next or still referenced by unacked response
+ if (hasNext() || referenceCount.get() != 0) {
return;
}
// clear the reference count of events
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+ // close data container in tsfile event
+ ((PipeTsFileInsertionEvent) enrichedEvent).close();
+ }
enrichedEvent.clearReferenceCount(this.getClass().getName());
}
enrichedEvents.clear();
- currentEnrichedEventsIterator = null;
- currentTabletInsertionEventsIterator = null;
- currentTsFileInsertionEvent = null;
+ resetForIteration();
}
/////////////////////////////// utility ///////////////////////////////
@@ -144,9 +146,8 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@Override
protected List<SubscriptionEvent> generateSubscriptionEvents() {
- resetIterator();
- return Collections.singletonList(
- new SubscriptionEvent(this,
prefetchingQueue::generateSubscriptionCommitContext));
+ resetForIteration();
+ return Collections.singletonList(new SubscriptionEvent(this,
prefetchingQueue));
}
@Override
@@ -199,12 +200,22 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
/////////////////////////////// iterator ///////////////////////////////
- public void resetIterator() {
+ public List<EnrichedEvent> sendIterationSnapshot() {
+ final List<EnrichedEvent> result =
Collections.unmodifiableList(iteratedEnrichedEvents);
+ iteratedEnrichedEvents = new ArrayList<>();
+ referenceCount.incrementAndGet();
+ return result;
+ }
+
+ public void resetForIteration() {
currentEnrichedEventsIterator = enrichedEvents.iterator();
currentTabletInsertionEventsIterator = null;
currentTsFileInsertionEvent = null;
- iteratedEnrichedEvents.clear();
+ iteratedEnrichedEvents = new ArrayList<>();
+ referenceCount.set(0);
+
+ iteratedCount.set(0);
}
@Override
@@ -300,25 +311,4 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
return null;
}
}
-
- /////////////////////////////// stringify ///////////////////////////////
-
- @Override
- public String toString() {
- return "SubscriptionPipeTabletEventBatch" + this.coreReportMessage();
- }
-
- @Override
- protected Map<String, String> coreReportMessage() {
- final Map<String, String> coreReportMessage = super.coreReportMessage();
- coreReportMessage.put("firstEventProcessingTime",
String.valueOf(firstEventProcessingTime));
- coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize));
- coreReportMessage.put(
- "estimatedInsertNodeTabletInsertionEventSize",
- String.valueOf(getEstimatedInsertNodeTabletInsertionEventSize()));
- coreReportMessage.put(
- "estimatedRawTabletInsertionEventSize",
- String.valueOf(getEstimatedRawTabletInsertionEventSize()));
- return coreReportMessage;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 514795db23a..6148cb1a060 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch {
@@ -111,18 +110,4 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
protected boolean shouldEmit() {
return batch.shouldEmit();
}
-
- /////////////////////////////// stringify ///////////////////////////////
-
- @Override
- public String toString() {
- return "SubscriptionPipeTsFileEventBatch" + this.coreReportMessage();
- }
-
- @Override
- protected Map<String, String> coreReportMessage() {
- final Map<String, String> coreReportMessage = super.coreReportMessage();
- coreReportMessage.put("batch", batch.toString());
- return coreReportMessage;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
index d4eba2adffb..1a0be808c62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
@@ -57,6 +57,9 @@ public class CachedSubscriptionPollResponse extends
SubscriptionPollResponse {
public void invalidateByteBuffer() {
// maybe friendly for gc
byteBuffer = null;
+ }
+
+ public void closeMemoryBlock() {
if (Objects.nonNull(memoryBlock)) {
memoryBlock.close();
}
@@ -88,6 +91,8 @@ public class CachedSubscriptionPollResponse extends
SubscriptionPollResponse {
Objects.isNull(byteBuffer)
? "<unknown>"
: String.valueOf(byteBuffer.limit() - byteBuffer.position()));
+ coreReportMessage.put(
+ "memoryBlock", Objects.isNull(memoryBlock) ? "<unknown>" :
memoryBlock.toString());
return coreReportMessage;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index ef6cc39a60c..a2b9b3c3db9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.subscription.event.pipe;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
public class SubscriptionPipeEmptyEvent implements SubscriptionPipeEvents {
@Override
@@ -31,7 +33,7 @@ public class SubscriptionPipeEmptyEvent implements
SubscriptionPipeEvents {
@Override
public String toString() {
- return "SubscriptionEmptyPipeEvent";
+ return toStringHelper(this).toString();
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index f518c7b0194..87a69fb2be2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -19,19 +19,41 @@
package org.apache.iotdb.db.subscription.event.pipe;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents {
private final SubscriptionPipeTabletEventBatch batch;
+ private volatile List<EnrichedEvent> iteratedEnrichedEvents;
public SubscriptionPipeTabletBatchEvents(final
SubscriptionPipeTabletEventBatch batch) {
this.batch = batch;
}
+ public void receiveIterationSnapshot(final List<EnrichedEvent>
iteratedEnrichedEvents) {
+ this.iteratedEnrichedEvents = iteratedEnrichedEvents;
+ }
+
@Override
public void ack() {
batch.ack();
+
+ // only decrease the reference count of iterated events
+ for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+ if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+ // close data container in tsfile event
+ ((PipeTsFileInsertionEvent) enrichedEvent).close();
+ }
+ enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+ }
}
@Override
@@ -43,13 +65,33 @@ public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents
@Override
public String toString() {
- return "SubscriptionPipeTabletBatchEvents{batch=" + batch + "}";
+ return toStringHelper(this)
+ .add("batch", batch)
+ .add("events", formatEnrichedEvents(iteratedEnrichedEvents, 4))
+ .toString();
+ }
+
+ private static String formatEnrichedEvents(
+ final List<EnrichedEvent> enrichedEvents, final int threshold) {
+ if (Objects.isNull(enrichedEvents)) {
+ return "[]";
+ }
+ final List<String> eventMessageList =
+ enrichedEvents.stream()
+ .limit(threshold)
+ .map(EnrichedEvent::coreReportMessage)
+ .collect(Collectors.toList());
+ if (enrichedEvents.size() > threshold) {
+ eventMessageList.add(
+ String.format("omit the remaining %s event(s)...",
enrichedEvents.size() - threshold));
+ }
+ return eventMessageList.toString();
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
@Override
public int getPipeEventCount() {
- return batch.getPipeEventCount();
+ return iteratedEnrichedEvents.size();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 16151a16a50..8b98f1b758f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -23,6 +23,8 @@ import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventB
import java.util.concurrent.atomic.AtomicInteger;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents {
private final SubscriptionPipeTsFileEventBatch batch;
@@ -54,11 +56,7 @@ public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents
@Override
public String toString() {
- return "SubscriptionPipeTsFileBatchEvents{batch="
- + batch
- + ", referenceCount="
- + referenceCount
- + "}";
+ return toStringHelper(this).add("batch", batch).add("count",
count).toString();
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index 8effb654d73..453e44be965 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.subscription.event.pipe;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
public class SubscriptionPipeTsFilePlainEvent implements
SubscriptionPipeEvents {
private final PipeTsFileInsertionEvent tsFileInsertionEvent;
@@ -36,6 +38,8 @@ public class SubscriptionPipeTsFilePlainEvent implements
SubscriptionPipeEvents
@Override
public void cleanUp() {
+ // close data container in tsfile event
+ tsFileInsertionEvent.close();
// clear the reference count of event
tsFileInsertionEvent.clearReferenceCount(this.getClass().getName());
}
@@ -44,9 +48,7 @@ public class SubscriptionPipeTsFilePlainEvent implements
SubscriptionPipeEvents
@Override
public String toString() {
- return "SubscriptionPipeTsFilePlainEvent{tsFileInsertionEvent="
- + tsFileInsertionEvent.coreReportMessage()
- + "}";
+ return toStringHelper(this).add("event", tsFileInsertionEvent).toString();
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
index dba2f48b466..772de6bf36c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
@@ -79,6 +79,7 @@ public abstract class SubscriptionEventExtendableResponse
public void cleanUp() {
CachedSubscriptionPollResponse response;
while (Objects.nonNull(response = poll())) {
+ response.closeMemoryBlock();
SubscriptionPollResponseCache.getInstance().invalidate(response);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
index ed6ad138276..c1702876e17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
@@ -19,11 +19,8 @@
package org.apache.iotdb.db.subscription.event.response;
-import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.function.Consumer;
public interface SubscriptionEventResponse<E> {
@@ -31,7 +28,8 @@ public interface SubscriptionEventResponse<E> {
E getCurrentResponse();
- void prefetchRemainingResponses() throws Exception;
+ @Deprecated // TBD.
+ void prefetchRemainingResponses();
void fetchNextResponse(final long offset) throws Exception;
@@ -47,10 +45,6 @@ public interface SubscriptionEventResponse<E> {
/////////////////////////////// lifecycle ///////////////////////////////
- default void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
- // do nothing
- }
-
void nack();
void cleanUp();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
index 7c72e65b6d6..aca492125fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
@@ -96,6 +96,7 @@ public class SubscriptionEventSingleResponse
@Override
public void cleanUp() {
+ response.closeMemoryBlock();
invalidateCurrentResponseByteBuffer();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index 35591ed405e..cd93cd350ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -19,12 +19,18 @@
package org.apache.iotdb.db.subscription.event.response;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
-import
org.apache.iotdb.db.subscription.event.SubscriptionCommitContextSupplier;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
+import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
@@ -36,9 +42,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
/**
* The {@code SubscriptionEventTabletResponse} class extends {@link
@@ -58,8 +64,11 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
SubscriptionConfig.getInstance().getSubscriptionPrefetchTabletBatchMaxSizeInBytes();
private final SubscriptionPipeTabletEventBatch batch;
+ private final SubscriptionPrefetchingQueue queue;
+ private final SubscriptionPipeTabletBatchEvents events;
+
private final SubscriptionCommitContext commitContext;
- private final SubscriptionCommitContextSupplier commitContextSupplier;
+ private final SubscriptionCommitContext rootCommitContext;
private volatile int totalTablets;
private final AtomicInteger nextOffset = new AtomicInteger(0);
@@ -70,11 +79,16 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
public SubscriptionEventTabletResponse(
final SubscriptionPipeTabletEventBatch batch,
+ final SubscriptionPrefetchingQueue queue,
+ final SubscriptionPipeTabletBatchEvents events,
final SubscriptionCommitContext commitContext,
- final SubscriptionCommitContextSupplier commitContextSupplier) {
+ final SubscriptionCommitContext rootCommitContext) {
this.batch = batch;
+ this.queue = queue;
+ this.events = events;
+
this.commitContext = commitContext;
- this.commitContextSupplier = commitContextSupplier;
+ this.rootCommitContext = rootCommitContext;
init();
}
@@ -85,20 +99,18 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
}
@Override
- public void fetchNextResponse(final long offset /* unused */) {
+ public void fetchNextResponse(final long offset /* unused */) throws
Exception {
+ // generate and offer next response
offer(generateNextTabletResponse());
- if (Objects.isNull(poll())) {
+
+ // poll and clean previous response
+ final CachedSubscriptionPollResponse previousResponse;
+ if (Objects.isNull(previousResponse = poll())) {
LOGGER.warn(
"SubscriptionEventTabletResponse {} is empty when fetching next
response (broken invariant)",
this);
- }
- }
-
- @Override
- public synchronized void ack(final Consumer<SubscriptionEvent>
onCommittedHook) {
- if (availableForNext) {
- // generate next subscription event with the same batch
- onCommittedHook.accept(new SubscriptionEvent(batch,
commitContextSupplier));
+ } else {
+ previousResponse.closeMemoryBlock();
}
}
@@ -113,7 +125,7 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
// should not reset the iterator of batch when init
// TODO: avoid completely rewinding the iterator
- batch.resetIterator();
+ batch.resetForIteration();
init();
}
@@ -143,20 +155,37 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
return;
}
- offer(generateNextTabletResponse());
+ offer(generateEmptyTabletResponse());
}
- private synchronized CachedSubscriptionPollResponse
generateNextTabletResponse() {
+ private synchronized CachedSubscriptionPollResponse
generateEmptyTabletResponse() {
+ return new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(Collections.emptyList(),
nextOffset.incrementAndGet()),
+ commitContext);
+ }
+
+ private synchronized CachedSubscriptionPollResponse
generateNextTabletResponse()
+ throws InterruptedException, PipeRuntimeOutOfMemoryCriticalException {
if (availableForNext) {
+ // generate next subscription event with the same batch
+ queue.prefetchEvent(new SubscriptionEvent(batch, queue,
rootCommitContext));
+ // frozen iterated enriched events
+ transportIterationSnapshot();
+ // return last response of this subscription event
return new CachedSubscriptionPollResponse(
SubscriptionPollResponseType.TABLETS.getType(),
new TabletsPayload(Collections.emptyList(), -totalTablets),
commitContext);
}
+ CachedSubscriptionPollResponse response = null;
final List<Tablet> currentTablets = new ArrayList<>();
long currentBufferSize = 0;
+ // TODO: TBD.
+ //
waitForResourceEnough4Parsing(SubscriptionAgent.receiver().remainingMs());
+
while (batch.hasNext()) {
final List<Tablet> tablets = batch.next();
if (Objects.isNull(tablets)) {
@@ -171,49 +200,133 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
.orElse(0L);
totalTablets += tablets.size();
totalBufferSize += bufferSize;
+ currentBufferSize += bufferSize;
if (bufferSize > READ_TABLET_BUFFER_SIZE) {
// TODO: split tablets
- LOGGER.warn("Detect large tablets with {} byte(s).", bufferSize);
- return new CachedSubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
- commitContext);
+ LOGGER.warn(
+ "Detect large tablets with {} byte(s), current tablets size {}
byte(s)",
+ bufferSize,
+ currentTablets);
+ response =
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
+ commitContext);
+ break;
}
- if (currentBufferSize + bufferSize > READ_TABLET_BUFFER_SIZE) {
+ if (currentBufferSize > READ_TABLET_BUFFER_SIZE) {
// TODO: split tablets
- return new CachedSubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
- commitContext);
+ response =
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
+ commitContext);
+ break;
}
- currentBufferSize += bufferSize;
-
// limit control for large message
if (totalBufferSize > PREFETCH_TABLET_BUFFER_SIZE && batch.hasNext()) {
+ // we generate seal signal at next round
availableForNext = true;
break;
}
}
- final CachedSubscriptionPollResponse response;
- if (currentTablets.isEmpty()) {
- response =
- new CachedSubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(Collections.emptyList(), -totalTablets),
- commitContext);
- hasNoMore = true;
- } else {
- response =
- new CachedSubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
- commitContext);
+ if (Objects.isNull(response)) {
+ if (currentTablets.isEmpty()) {
+ // frozen iterated enriched events
+ transportIterationSnapshot();
+ // return last response of this subscription event
+ response =
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(Collections.emptyList(), -totalTablets),
+ commitContext);
+ hasNoMore = true;
+ } else {
+ response =
+ new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
+ commitContext);
+ }
+ }
+
+ // set fixed memory block for response
+ final List<Tablet> tablets = ((TabletsPayload)
response.getPayload()).getTablets();
+ if (Objects.nonNull(tablets) && !tablets.isEmpty()) {
+ final PipeTabletMemoryBlock memoryBlock =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(currentBufferSize);
+ response.setMemoryBlock(memoryBlock);
}
return response;
}
+
+ private void waitForResourceEnough4Parsing(final long timeoutMs) throws
InterruptedException {
+ final PipeMemoryManager memoryManager =
PipeDataNodeResourceManager.memory();
+ if (memoryManager.isEnough4TabletParsing()) {
+ return;
+ }
+
+ final long startTime = System.currentTimeMillis();
+ long lastRecordTime = startTime;
+
+ final long memoryCheckIntervalMs =
+
SubscriptionConfig.getInstance().getSubscriptionCheckMemoryEnoughIntervalMs();
+ while (!memoryManager.isEnough4TabletParsing()) {
+ Thread.sleep(memoryCheckIntervalMs);
+
+ final long currentTime = System.currentTimeMillis();
+ final double elapsedRecordTimeSeconds = (currentTime - lastRecordTime) /
1000.0;
+ final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
+ if (elapsedRecordTimeSeconds > 10.0) {
+ LOGGER.info(
+ "SubscriptionEventTabletResponse {} wait for resource enough for
parsing tablets {} seconds.",
+ commitContext,
+ waitTimeSeconds);
+ lastRecordTime = currentTime;
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "SubscriptionEventTabletResponse {} wait for resource enough for
parsing tablets {} seconds.",
+ commitContext,
+ waitTimeSeconds);
+ }
+
+ if (waitTimeSeconds * 1000 > timeoutMs) {
+ // should contain 'TimeoutException' in exception message
+ throw new PipeException(
+ String.format("TimeoutException: Waited %s seconds",
waitTimeSeconds));
+ }
+ }
+
+ final long currentTime = System.currentTimeMillis();
+ final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
+ LOGGER.info(
+ "SubscriptionEventTabletResponse {} wait for resource enough for
parsing tablets {} seconds.",
+ commitContext,
+ waitTimeSeconds);
+ }
+
+ private void transportIterationSnapshot() {
+ events.receiveIterationSnapshot(batch.sendIterationSnapshot());
+ }
+
+ /////////////////////////////// stringify ///////////////////////////////
+
+ @Override
+ public String toString() {
+ return "SubscriptionEventTabletResponse" + coreReportMessage();
+ }
+
+ protected Map<String, String> coreReportMessage() {
+ final Map<String, String> result = super.coreReportMessage();
+ result.put("totalTablets", String.valueOf(totalTablets));
+ result.put("nextOffset", String.valueOf(nextOffset));
+ result.put("totalBufferSize", String.valueOf(totalBufferSize));
+ result.put("availableForNext", String.valueOf(availableForNext));
+ return result;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
index 2397ec51c60..0788a89c661 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.subscription.event.response;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -56,6 +57,9 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionEventTsFileResponse.class);
+ private static final long READ_FILE_BUFFER_SIZE =
+ SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
+
private final File tsFile;
private final SubscriptionCommitContext commitContext;
@@ -151,7 +155,8 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
}
private @NonNull CachedSubscriptionPollResponse
generateResponseWithPieceOrSealPayload(
- final long writingOffset) throws SubscriptionException, IOException,
InterruptedException {
+ final long writingOffset)
+ throws IOException, InterruptedException,
PipeRuntimeOutOfMemoryCriticalException {
final long tsFileLength = tsFile.length();
if (writingOffset >= tsFileLength) {
// generate subscription poll response with seal payload
@@ -162,15 +167,13 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
commitContext);
}
- final long readFileBufferSize =
- SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
final long bufferSize;
- if (writingOffset + readFileBufferSize >= tsFileLength) {
+ if (writingOffset + READ_FILE_BUFFER_SIZE >= tsFileLength) {
// last piece
bufferSize = tsFileLength - writingOffset;
} else {
// not last piece
- bufferSize = readFileBufferSize;
+ bufferSize = READ_FILE_BUFFER_SIZE;
}
waitForResourceEnough4Slicing(SubscriptionAgent.receiver().remainingMs());
@@ -196,6 +199,8 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
SubscriptionPollResponseType.FILE_PIECE.getType(),
new FilePiecePayload(tsFile.getName(), writingOffset +
readLength, readBuffer),
commitContext);
+
+ // set fixed memory block for response
response.setMemoryBlock(memoryBlock);
return response;
}
@@ -211,7 +216,7 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
long lastRecordTime = startTime;
final long memoryCheckIntervalMs =
-
PipeConfig.getInstance().getPipeTsFileParserCheckMemoryEnoughIntervalMs();
+
SubscriptionConfig.getInstance().getSubscriptionCheckMemoryEnoughIntervalMs();
while (!memoryManager.isEnough4TsFileSlicing()) {
Thread.sleep(memoryCheckIntervalMs);
@@ -244,4 +249,17 @@ public class SubscriptionEventTsFileResponse extends
SubscriptionEventExtendable
LOGGER.info(
"Wait for resource enough for slicing tsfile {} for {} seconds.",
tsFile, waitTimeSeconds);
}
+
+ /////////////////////////////// stringify ///////////////////////////////
+
+ @Override
+ public String toString() {
+ return "SubscriptionEventTsFileResponse" + coreReportMessage();
+ }
+
+ protected Map<String, String> coreReportMessage() {
+ final Map<String, String> result = super.coreReportMessage();
+ result.put("tsFile", String.valueOf(tsFile));
+ return result;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 50db0de3021..b2eeaf25f2c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -287,13 +287,12 @@ public class CommonConfig {
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
private long twoStageAggregateSenderEndPointsCacheInMs = 3 * 60 * 1000L; //
3 minutes
- private float subscriptionCacheMemoryUsagePercentage = 0.2F;
-
private boolean pipeEventReferenceTrackingEnabled = true;
private long pipeEventReferenceEliminateIntervalSeconds = 10;
- private int subscriptionSubtaskExecutorMaxThreadNum =
- Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+ private float subscriptionCacheMemoryUsagePercentage = 0.2F;
+ private int subscriptionSubtaskExecutorMaxThreadNum = 2;
+
private int subscriptionPrefetchTabletBatchMaxDelayInMs = 1000; // 1s
private long subscriptionPrefetchTabletBatchMaxSizeInBytes = 16 * MB;
private int subscriptionPrefetchTsFileBatchMaxDelayInMs = 5000; // 5s
@@ -305,7 +304,13 @@ public class CommonConfig {
private long subscriptionReadFileBufferSize = 8 * MB;
private long subscriptionReadTabletBufferSize = 8 * MB;
private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
- private volatile long subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs =
10L;
+ private volatile long subscriptionCheckMemoryEnoughIntervalMs = 10L;
+
+ private boolean subscriptionPrefetchEnabled = true;
+ private float subscriptionPrefetchMemoryThreshold = 0.5F;
+ private float subscriptionPrefetchMissingRateThreshold = 0.9F;
+ private int subscriptionPrefetchEventLocalCountThreshold = 10;
+ private int subscriptionPrefetchEventGlobalCountThreshold = 100;
private long subscriptionMetaSyncerInitialSyncDelayMinutes = 3;
private long subscriptionMetaSyncerSyncIntervalMinutes = 3;
@@ -1309,10 +1314,7 @@ public class CommonConfig {
public void setSubscriptionSubtaskExecutorMaxThreadNum(
int subscriptionSubtaskExecutorMaxThreadNum) {
- this.subscriptionSubtaskExecutorMaxThreadNum =
- Math.min(
- subscriptionSubtaskExecutorMaxThreadNum,
- Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+ this.subscriptionSubtaskExecutorMaxThreadNum =
subscriptionSubtaskExecutorMaxThreadNum;
}
public int getSubscriptionPrefetchTabletBatchMaxDelayInMs() {
@@ -1413,14 +1415,58 @@ public class CommonConfig {
subscriptionTsFileDeduplicationWindowSeconds;
}
- public long getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs() {
- return subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs;
+ public long getSubscriptionCheckMemoryEnoughIntervalMs() {
+ return subscriptionCheckMemoryEnoughIntervalMs;
+ }
+
+ public void setSubscriptionCheckMemoryEnoughIntervalMs(
+ long subscriptionCheckMemoryEnoughIntervalMs) {
+ this.subscriptionCheckMemoryEnoughIntervalMs =
subscriptionCheckMemoryEnoughIntervalMs;
+ }
+
+ public boolean getSubscriptionPrefetchEnabled() {
+ return subscriptionPrefetchEnabled;
+ }
+
+ public void setSubscriptionPrefetchEnabled(boolean
subscriptionPrefetchEnabled) {
+ this.subscriptionPrefetchEnabled = subscriptionPrefetchEnabled;
+ }
+
+ public float getSubscriptionPrefetchMemoryThreshold() {
+ return subscriptionPrefetchMemoryThreshold;
+ }
+
+ public void setSubscriptionPrefetchMemoryThreshold(float
subscriptionPrefetchMemoryThreshold) {
+ this.subscriptionPrefetchMemoryThreshold =
subscriptionPrefetchMemoryThreshold;
+ }
+
+ public float getSubscriptionPrefetchMissingRateThreshold() {
+ return subscriptionPrefetchMissingRateThreshold;
+ }
+
+ public void setSubscriptionPrefetchMissingRateThreshold(
+ float subscriptionPrefetchMissingRateThreshold) {
+ this.subscriptionPrefetchMissingRateThreshold =
subscriptionPrefetchMissingRateThreshold;
+ }
+
+ public int getSubscriptionPrefetchEventLocalCountThreshold() {
+ return subscriptionPrefetchEventLocalCountThreshold;
+ }
+
+ public void setSubscriptionPrefetchEventLocalCountThreshold(
+ int subscriptionPrefetchEventLocalCountThreshold) {
+ this.subscriptionPrefetchEventLocalCountThreshold =
+ subscriptionPrefetchEventLocalCountThreshold;
+ }
+
+ public int getSubscriptionPrefetchEventGlobalCountThreshold() {
+ return subscriptionPrefetchEventGlobalCountThreshold;
}
- public void setSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs(
- long subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs) {
- this.subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs =
- subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs;
+ public void setSubscriptionPrefetchEventGlobalCountThreshold(
+ int subscriptionPrefetchEventGlobalCountThreshold) {
+ this.subscriptionPrefetchEventGlobalCountThreshold =
+ subscriptionPrefetchEventGlobalCountThreshold;
}
public long getSubscriptionMetaSyncerInitialSyncDelayMinutes() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c40429567c2..529d1053c42 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -655,15 +655,12 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_cache_memory_usage_percentage",
String.valueOf(config.getSubscriptionCacheMemoryUsagePercentage()))));
-
config.setSubscriptionSubtaskExecutorMaxThreadNum(
Integer.parseInt(
properties.getProperty(
"subscription_subtask_executor_max_thread_num",
Integer.toString(config.getSubscriptionSubtaskExecutorMaxThreadNum()))));
- if (config.getSubscriptionSubtaskExecutorMaxThreadNum() <= 0) {
- config.setSubscriptionSubtaskExecutorMaxThreadNum(5);
- }
+
config.setSubscriptionPrefetchTabletBatchMaxDelayInMs(
Integer.parseInt(
properties.getProperty(
@@ -719,11 +716,37 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_ts_file_deduplication_window_seconds",
String.valueOf(config.getSubscriptionTsFileDeduplicationWindowSeconds()))));
- config.setSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs(
+ config.setSubscriptionCheckMemoryEnoughIntervalMs(
Long.parseLong(
properties.getProperty(
- "subscription_ts_file_slicer_check_memory_enough_interval_ms",
-
String.valueOf(config.getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs()))));
+ "subscription_check_memory_enough_interval_ms",
+
String.valueOf(config.getSubscriptionCheckMemoryEnoughIntervalMs()))));
+
+ config.setSubscriptionPrefetchEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "subscription_prefetch_enabled",
+ String.valueOf(config.getSubscriptionPrefetchEnabled()))));
+ config.setSubscriptionPrefetchMemoryThreshold(
+ Float.parseFloat(
+ properties.getProperty(
+ "subscription_prefetch_memory_threshold",
+
String.valueOf(config.getSubscriptionPrefetchMemoryThreshold()))));
+ config.setSubscriptionPrefetchMissingRateThreshold(
+ Float.parseFloat(
+ properties.getProperty(
+ "subscription_prefetch_missing_rate_threshold",
+
String.valueOf(config.getSubscriptionPrefetchMemoryThreshold()))));
+ config.setSubscriptionPrefetchEventLocalCountThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "subscription_prefetch_event_local_count_threshold",
+
String.valueOf(config.getSubscriptionPrefetchEventLocalCountThreshold()))));
+ config.setSubscriptionPrefetchEventGlobalCountThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "subscription_prefetch_event_global_count_threshold",
+
String.valueOf(config.getSubscriptionPrefetchEventGlobalCountThreshold()))));
config.setSubscriptionMetaSyncerInitialSyncDelayMinutes(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index fb1f37eae8d..76e29d77ee0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -81,6 +81,30 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionTsFileDeduplicationWindowSeconds();
}
+ public long getSubscriptionCheckMemoryEnoughIntervalMs() {
+ return COMMON_CONFIG.getSubscriptionCheckMemoryEnoughIntervalMs();
+ }
+
+ public boolean getSubscriptionPrefetchEnabled() {
+ return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
+ }
+
+ public float getSubscriptionPrefetchMemoryThreshold() {
+ return COMMON_CONFIG.getSubscriptionPrefetchMemoryThreshold();
+ }
+
+ public float getSubscriptionPrefetchMissingRateThreshold() {
+ return COMMON_CONFIG.getSubscriptionPrefetchMissingRateThreshold();
+ }
+
+ public int getSubscriptionPrefetchEventLocalCountThreshold() {
+ return COMMON_CONFIG.getSubscriptionPrefetchEventLocalCountThreshold();
+ }
+
+ public int getSubscriptionPrefetchEventGlobalCountThreshold() {
+ return COMMON_CONFIG.getSubscriptionPrefetchEventGlobalCountThreshold();
+ }
+
public long getSubscriptionMetaSyncerInitialSyncDelayMinutes() {
return COMMON_CONFIG.getSubscriptionMetaSyncerInitialSyncDelayMinutes();
}
@@ -89,10 +113,6 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionMetaSyncerSyncIntervalMinutes();
}
- public long getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs() {
- return
COMMON_CONFIG.getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs();
- }
-
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConfig.class);
@@ -100,10 +120,10 @@ public class SubscriptionConfig {
public void printAllConfigs() {
LOGGER.info(
"SubscriptionCacheMemoryUsagePercentage: {}",
getSubscriptionCacheMemoryUsagePercentage());
-
LOGGER.info(
"SubscriptionSubtaskExecutorMaxThreadNum: {}",
getSubscriptionSubtaskExecutorMaxThreadNum());
+
LOGGER.info(
"SubscriptionPrefetchTabletBatchMaxDelayInMs: {}",
getSubscriptionPrefetchTabletBatchMaxDelayInMs());
@@ -128,8 +148,21 @@ public class SubscriptionConfig {
"SubscriptionTsFileDeduplicationWindowSeconds: {}",
getSubscriptionTsFileDeduplicationWindowSeconds());
LOGGER.info(
- "SubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs: {}",
- getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs());
+ "SubscriptionCheckMemoryEnoughIntervalMs: {}",
+ getSubscriptionCheckMemoryEnoughIntervalMs());
+
+ LOGGER.info("SubscriptionPrefetchEnabled: {}",
getSubscriptionPrefetchEnabled());
+ LOGGER.info(
+ "SubscriptionPrefetchMemoryThreshold: {}",
getSubscriptionPrefetchMemoryThreshold());
+ LOGGER.info(
+ "SubscriptionPrefetchMissingRateThreshold: {}",
+ getSubscriptionPrefetchMissingRateThreshold());
+ LOGGER.info(
+ "SubscriptionPrefetchEventLocalCountThreshold: {}",
+ getSubscriptionPrefetchEventLocalCountThreshold());
+ LOGGER.info(
+ "SubscriptionPrefetchEventGlobalCountThreshold: {}",
+ getSubscriptionPrefetchEventGlobalCountThreshold());
LOGGER.info(
"SubscriptionMetaSyncerInitialSyncDelayMinutes: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 3e409a9160e..c73f08766f5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -196,6 +196,8 @@ public class TopicMeta {
extractorAttributes.putAll(config.getAttributesWithSourceMode());
// loose range
extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
+ // backdoor configs
+ extractorAttributes.putAll(config.getAttributesWithSourcePrefix());
return extractorAttributes;
}
@@ -209,6 +211,8 @@ public class TopicMeta {
connectorAttributes.put(PipeConnectorConstant.SINK_TOPIC_KEY, topicName);
connectorAttributes.put(PipeConnectorConstant.SINK_CONSUMER_GROUP_KEY,
consumerGroupId);
connectorAttributes.putAll(config.getAttributesWithSinkFormat());
+ // backdoor configs
+ connectorAttributes.putAll(config.getAttributesWithSinkPrefix());
return connectorAttributes;
}