This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f5b729d0687 Pipe: incorporate batch data into 
TsFileInsertionEventScanParser memory control & Subscription: close data 
container in tsfile event & bind memory block for tablet response & generate 
empty tablet as initial response & offer subsequent tablet response before ack 
& expose prefetch and backdoor configs & best-effort disorder control (#14752) 
(#14849)
f5b729d0687 is described below

commit f5b729d06875187bdf0eab03e90d6d086afa689f
Author: VGalaxies <[email protected]>
AuthorDate: Fri Feb 14 17:50:42 2025 +0800

    Pipe: incorporate batch data into TsFileInsertionEventScanParser memory 
control & Subscription: close data container in tsfile event & bind memory 
block for tablet response & generate empty tablet as initial response & offer 
subsequent tablet response before ack & expose prefetch and backdoor configs & 
best-effort disorder control (#14752) (#14849)
---
 .../iotdb/rpc/subscription/config/TopicConfig.java |  24 +++
 .../subscription/payload/poll/TabletsPayload.java  |  11 +-
 .../scan/TsFileInsertionScanDataContainer.java     |  19 ++
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  48 +++++
 .../broker/SubscriptionPrefetchingQueue.java       |  47 +++--
 .../broker/SubscriptionPrefetchingQueueStates.java |  96 +++++++---
 .../broker/SubscriptionPrefetchingTabletQueue.java |   2 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |   2 +-
 .../event/SubscriptionCommitContextSupplier.java   |  28 ---
 .../db/subscription/event/SubscriptionEvent.java   |  80 +++++---
 .../event/batch/SubscriptionPipeEventBatch.java    |  30 ---
 .../batch/SubscriptionPipeTabletEventBatch.java    |  66 +++----
 .../batch/SubscriptionPipeTsFileEventBatch.java    |  15 --
 .../cache/CachedSubscriptionPollResponse.java      |   5 +
 .../event/pipe/SubscriptionPipeEmptyEvent.java     |   4 +-
 .../pipe/SubscriptionPipeTabletBatchEvents.java    |  46 ++++-
 .../pipe/SubscriptionPipeTsFileBatchEvents.java    |   8 +-
 .../pipe/SubscriptionPipeTsFilePlainEvent.java     |   8 +-
 .../SubscriptionEventExtendableResponse.java       |   1 +
 .../event/response/SubscriptionEventResponse.java  |  10 +-
 .../response/SubscriptionEventSingleResponse.java  |   1 +
 .../response/SubscriptionEventTabletResponse.java  | 201 ++++++++++++++++-----
 .../response/SubscriptionEventTsFileResponse.java  |  32 +++-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  76 ++++++--
 .../iotdb/commons/conf/CommonDescriptor.java       |  37 +++-
 .../subscription/config/SubscriptionConfig.java    |  47 ++++-
 .../commons/subscription/meta/topic/TopicMeta.java |   4 +
 27 files changed, 671 insertions(+), 277 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 46dc7601e0f..55c84e94037 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -124,6 +124,19 @@ public class TopicConfig extends PipeParameters {
         .collect(Collectors.toMap(key -> key, key -> looseRangeValue));
   }
 
+  public Map<String, String> getAttributesWithSourcePrefix() {
+    final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
+    attributes.forEach(
+        (key, value) -> {
+          if (key.toLowerCase().startsWith("source")) {
+            attributesWithProcessorPrefix.put(key, value);
+          }
+        });
+    return attributesWithProcessorPrefix;
+  }
+
+  /////////////////////////////// processor attributes mapping 
///////////////////////////////
+
   public Map<String, String> getAttributesWithProcessorPrefix() {
     final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
     attributes.forEach(
@@ -138,4 +151,15 @@ public class TopicConfig extends PipeParameters {
   public Map<String, String> getAttributesWithSinkFormat() {
     return Collections.emptyMap(); // default to hybrid
   }
+
+  public Map<String, String> getAttributesWithSinkPrefix() {
+    final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
+    attributes.forEach(
+        (key, value) -> {
+          if (key.toLowerCase().startsWith("sink")) {
+            attributesWithProcessorPrefix.put(key, value);
+          }
+        });
+    return attributesWithProcessorPrefix;
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
index 44f8d642327..23b9e066329 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
@@ -35,9 +35,14 @@ public class TabletsPayload implements 
SubscriptionPollPayload {
   private transient List<Tablet> tablets = new ArrayList<>();
 
   /**
-   * The field to be filled in the next {@link PollTabletsPayload} request. If 
negative, it
-   * indicates all tablets have been fetched, and -nextOffset represents the 
total number of
-   * tablets.
+   * The field to be filled in the next {@link PollTabletsPayload} request.
+   *
+   * <ul>
+   *   <li>If nextOffset is 1, it indicates that the current payload is the 
first payload (its
+   *       tablets are empty) and the fetching should continue.
+   *   <li>If nextOffset is negative, it indicates all tablets have been 
fetched, and -nextOffset
+   *       represents the total number of tablets.
+   * </ul>
    */
   private transient int nextOffset;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index e73cca1cd9a..318e70b5605 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -71,6 +72,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
   private IChunkReader chunkReader;
   private BatchData data;
+  private final PipeMemoryBlock allocatedMemoryBlockForBatchData;
 
   private boolean currentIsMultiPage;
   private String currentDevice;
@@ -101,6 +103,10 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     this.endTime = endTime;
     filter = Objects.nonNull(timeFilterExpression) ? 
timeFilterExpression.getFilter() : null;
 
+    // Allocate empty memory block, will be resized later.
+    this.allocatedMemoryBlockForBatchData =
+        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
     try {
       tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
       tsFileSequenceReader.position((long) 
TSFileConfig.MAGIC_STRING.getBytes().length + 1);
@@ -264,6 +270,10 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
       do {
         data = chunkReader.nextPageData();
+        PipeDataNodeResourceManager.memory()
+            .forceResize(
+                allocatedMemoryBlockForBatchData,
+                PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data));
       } while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
     } while (!data.hasCurrent());
   }
@@ -497,4 +507,13 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
     return false;
   }
+
+  @Override
+  public void close() {
+    super.close();
+
+    if (allocatedMemoryBlockForBatchData != null) {
+      allocatedMemoryBlockForBatchData.close();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index cfe012fb40a..78baf1d80ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -246,4 +246,52 @@ public class PipeMemoryWeightUtil {
 
     return totalSizeInBytes;
   }
+
+  public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
+    int totalSizeInBytes = 0;
+
+    // timestamp
+    totalSizeInBytes += 8;
+
+    // values
+    final TSDataType type = batchData.getDataType();
+    if (type != null) {
+      if (type == TSDataType.VECTOR && batchData.getVector() != null) {
+        for (int i = 0; i < batchData.getVector().length; ++i) {
+          final TsPrimitiveType primitiveType = batchData.getVector()[i];
+          if (primitiveType == null || primitiveType.getDataType() == null) {
+            continue;
+          }
+          // consider variable references (plus 8) and memory alignment (round 
up to 8)
+          totalSizeInBytes += roundUpToMultiple(primitiveType.getSize() + 8, 
8);
+        }
+      } else {
+        if (type.isBinary()) {
+          final Binary binary = batchData.getBinary();
+          // refer to org.apache.tsfile.utils.TsPrimitiveType.TsBinary.getSize
+          totalSizeInBytes +=
+              roundUpToMultiple((binary == null ? 8 : binary.getLength() + 8) 
+ 8, 8);
+        } else {
+          totalSizeInBytes += 
roundUpToMultiple(TsPrimitiveType.getByType(type).getSize() + 8, 8);
+        }
+      }
+    }
+
+    return batchData.length() * totalSizeInBytes;
+  }
+
+  /**
+   * Rounds up the given integer num to the nearest multiple of n.
+   *
+   * @param num The integer to be rounded up.
+   * @param n The specified multiple.
+   * @return The nearest multiple of n greater than or equal to num.
+   */
+  private static int roundUpToMultiple(int num, int n) {
+    if (n == 0) {
+      throw new IllegalArgumentException("The multiple n must be greater than 
0");
+    }
+    // Calculate the rounded up value to the nearest multiple of n
+    return ((num + n - 1) / n) * n;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 5afbf212829..cf78aff6d4a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -47,7 +47,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,7 +68,7 @@ public abstract class SubscriptionPrefetchingQueue {
   private final AtomicLong commitIdGenerator;
 
   /** A queue containing a series of prefetched pollable {@link 
SubscriptionEvent}. */
-  protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue;
+  protected final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;
 
   /**
    * A map that tracks in-flight {@link SubscriptionEvent}, keyed by consumer 
id and commit context.
@@ -112,7 +112,7 @@ public abstract class SubscriptionPrefetchingQueue {
     this.inputPendingQueue = inputPendingQueue;
     this.commitIdGenerator = commitIdGenerator;
 
-    this.prefetchingQueue = new LinkedBlockingQueue<>();
+    this.prefetchingQueue = new PriorityBlockingQueue<>();
     this.inFlightEvents = new ConcurrentHashMap<>();
     this.batches = new SubscriptionPipeEventBatches(this, maxDelayInMs, 
maxBatchSizeInBytes);
 
@@ -259,9 +259,11 @@ public abstract class SubscriptionPrefetchingQueue {
       if (isClosed()) {
         return false;
       }
+      // TODO: more refined behavior (prefetch/serialize/...) control
       if (states.shouldPrefetch()) {
         tryPrefetch();
-        remapInFlightEventsSnapshot(committedCleaner, pollableNacker, 
responsePrefetcher);
+        remapInFlightEventsSnapshot(
+            committedCleaner, pollableNacker, responsePrefetcher, 
responseSerializer);
         return true;
       } else {
         remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
@@ -286,9 +288,18 @@ public abstract class SubscriptionPrefetchingQueue {
     }
   }
 
-  protected void enqueueEventToPrefetchingQueue(final SubscriptionEvent event) 
{
-    event.trySerializeCurrentResponse();
-    prefetchingQueue.add(event);
+  public void prefetchEvent(@NonNull final SubscriptionEvent thisEvent) {
+    final SubscriptionEvent thatEvent = prefetchingQueue.peek();
+    if (Objects.nonNull(thatEvent)) {
+      if (thisEvent.compareTo(thatEvent) < 0) {
+        // disorder causes:
+        // 1. prefetch nacked event
+        // 2. late cross-event of dataset payload
+        states.markDisorderCause();
+      }
+    }
+
+    prefetchingQueue.add(thisEvent);
   }
 
   /**
@@ -376,14 +387,14 @@ public abstract class SubscriptionPrefetchingQueue {
    * @return {@code true} if there are subscription events prefetched.
    */
   protected boolean onEvent(final TabletInsertionEvent event) {
-    return batches.onEvent((EnrichedEvent) event, 
this::enqueueEventToPrefetchingQueue);
+    return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
   }
 
   /**
    * @return {@code true} if there are subscription events prefetched.
    */
   protected boolean onEvent() {
-    return batches.onEvent(this::enqueueEventToPrefetchingQueue);
+    return batches.onEvent(this::prefetchEvent);
   }
 
   /////////////////////////////// commit ///////////////////////////////
@@ -449,7 +460,7 @@ public abstract class SubscriptionPrefetchingQueue {
                 this);
           }
 
-          ev.ack(this::enqueueEventToPrefetchingQueue);
+          ev.ack();
           ev.recordCommittedTimestamp(); // now committed
           acked.set(true);
 
@@ -655,12 +666,12 @@ public abstract class SubscriptionPrefetchingQueue {
       (ev) -> {
         if (ev.eagerlyPollable()) {
           ev.nack(); // now pollable (the nack operation here is actually 
unnecessary)
-          enqueueEventToPrefetchingQueue(ev);
+          prefetchEvent(ev);
           // no need to log warn for eagerly pollable event
           return null; // remove this entry
         } else if (ev.pollable()) {
           ev.nack(); // now pollable
-          enqueueEventToPrefetchingQueue(ev);
+          prefetchEvent(ev);
           LOGGER.warn(
               "Subscription: SubscriptionPrefetchingQueue {} recycle event {} 
from in flight events, nack and enqueue it to prefetching queue",
               this,
@@ -672,9 +683,19 @@ public abstract class SubscriptionPrefetchingQueue {
 
   private final RemappingFunction<SubscriptionEvent> responsePrefetcher =
       (ev) -> {
-        // prefetch and serialize the remaining responses
+        // prefetch the remaining responses
         try {
           ev.prefetchRemainingResponses();
+        } catch (final Exception ignored) {
+        }
+        return ev;
+      };
+
+  private final RemappingFunction<SubscriptionEvent> responseSerializer =
+      (ev) -> {
+        // serialize the responses
+        try {
+          ev.trySerializeCurrentResponse();
           ev.trySerializeRemainingResponses();
         } catch (final Exception ignored) {
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
index a699672c7b3..8b0eb256f42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
@@ -19,15 +19,19 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.metrics.core.utils.IoTDBMovingAverage;
 
 import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Meter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
+
 /**
  * The {@link SubscriptionPrefetchingQueueStates} manages the state of a {@link
  * SubscriptionPrefetchingQueue}. It determines whether prefetching should 
occur based on memory
@@ -40,16 +44,21 @@ public class SubscriptionPrefetchingQueueStates {
 
   private static final double EPSILON = 1e-6;
 
-  // TODO: config
-  private static final double PREFETCH_MEMORY_THRESHOLD = 0.6;
-  private static final double MISSING_RATE_THRESHOLD = 0.9;
-  private static final int PREFETCHED_EVENT_COUNT_CONTROL_PARAMETER = 100;
+  private static final double PREFETCH_MEMORY_THRESHOLD =
+      
SubscriptionConfig.getInstance().getSubscriptionPrefetchMemoryThreshold();
+  private static final double PREFETCH_MISSING_RATE_THRESHOLD =
+      
SubscriptionConfig.getInstance().getSubscriptionPrefetchMissingRateThreshold();
+  private static final int PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD =
+      
SubscriptionConfig.getInstance().getSubscriptionPrefetchEventLocalCountThreshold();
+  private static final int PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD =
+      
SubscriptionConfig.getInstance().getSubscriptionPrefetchEventGlobalCountThreshold();
 
   private final SubscriptionPrefetchingQueue prefetchingQueue;
 
   private volatile long lastPollRequestTimestamp;
   private final Meter pollRequestMeter;
   private final Meter missingPrefechMeter;
+  private final Counter disorderCauseCounter; // TODO: use meter
 
   public SubscriptionPrefetchingQueueStates(final SubscriptionPrefetchingQueue 
prefetchingQueue) {
     this.prefetchingQueue = prefetchingQueue;
@@ -57,6 +66,7 @@ public class SubscriptionPrefetchingQueueStates {
     this.lastPollRequestTimestamp = -1;
     this.pollRequestMeter = new Meter(new IoTDBMovingAverage(), 
Clock.defaultClock());
     this.missingPrefechMeter = new Meter(new IoTDBMovingAverage(), 
Clock.defaultClock());
+    this.disorderCauseCounter = new Counter();
   }
 
   public void markPollRequest() {
@@ -68,19 +78,55 @@ public class SubscriptionPrefetchingQueueStates {
     missingPrefechMeter.mark();
   }
 
+  public void markDisorderCause() {
+    disorderCauseCounter.inc();
+  }
+
+  private double pollRate() {
+    return pollRequestMeter.getOneMinuteRate();
+  }
+
+  private double missingRate() {
+    if (isApproximatelyZero(pollRate())) {
+      return 0.0;
+    }
+    return missingPrefechMeter.getOneMinuteRate() / pollRate();
+  }
+
   public boolean shouldPrefetch() {
+    // 1. reject conditions
+    // 1.1. option
+    if (!SubscriptionConfig.getInstance().getSubscriptionPrefetchEnabled()) {
+      return false;
+    }
+
+    // 1.2. memory usage
     if (!isMemoryEnough()) {
       return false;
     }
 
-    if (missingRate() > MISSING_RATE_THRESHOLD) {
-      return true;
+    // 1.3. local event count
+    if (hasTooManyPrefetchedLocalEvent()) {
+      return false;
     }
 
-    if (hasTooManyPrefetchedEvents()) {
+    // 1.4. global event count
+    if (hasTooManyPrefetchedGlobalEvent()) {
       return false;
     }
 
+    // 1.5. disorder history
+    if (hasDisorderCause()) {
+      return false;
+    }
+
+    // 2. permissive conditions
+    // 2.1. missing rate
+    if (isMissingRateTooHigh()) {
+      return true;
+    }
+
+    // 2.2. prefetch statistics
     // the delta between the prefetch timestamp and the timestamp of the last 
poll request > poll
     // request frequency
     return (System.currentTimeMillis() - lastPollRequestTimestamp) * 
pollRate() > 1000;
@@ -92,23 +138,24 @@ public class SubscriptionPrefetchingQueueStates {
         > PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes();
   }
 
-  private double pollRate() {
-    return pollRequestMeter.getOneMinuteRate();
+  private boolean hasTooManyPrefetchedLocalEvent() {
+    return prefetchingQueue.getPrefetchedEventCount() > 
PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
   }
 
-  private double missingRate() {
-    if (isApproximatelyZero(pollRate())) {
-      return 0.0;
-    }
-    return missingPrefechMeter.getOneMinuteRate() / pollRate();
-  }
-
-  private boolean hasTooManyPrefetchedEvents() {
+  private boolean hasTooManyPrefetchedGlobalEvent() {
     // The number of prefetched events in the current prefetching queue > 
floor(t / number of
     // prefetching queues), where t is an adjustable parameter.
     return prefetchingQueue.getPrefetchedEventCount()
             * SubscriptionAgent.broker().getPrefetchingQueueCount()
-        > PREFETCHED_EVENT_COUNT_CONTROL_PARAMETER;
+        > PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD;
+  }
+
+  private boolean isMissingRateTooHigh() {
+    return missingRate() > PREFETCH_MISSING_RATE_THRESHOLD;
+  }
+
+  private boolean hasDisorderCause() {
+    return disorderCauseCounter.getCount() > 0;
   }
 
   private static boolean isApproximatelyZero(final double value) {
@@ -117,12 +164,11 @@ public class SubscriptionPrefetchingQueueStates {
 
   @Override
   public String toString() {
-    return "PollPrefetchStates{lastPollRequestTimestamp="
-        + lastPollRequestTimestamp
-        + ", pollRate="
-        + pollRate()
-        + ", missingRate="
-        + missingRate()
-        + "}";
+    return toStringHelper(this)
+        .add("lastPollRequestTimestamp", lastPollRequestTimestamp)
+        .add("pollRate", pollRate())
+        .add("missingRate", missingRate())
+        .add("disorderCause", disorderCauseCounter.getCount())
+        .toString();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index e729fc74095..d590df17c2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -173,7 +173,7 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
 
   @Override
   protected boolean onEvent(final TsFileInsertionEvent event) {
-    return batches.onEvent((EnrichedEvent) event, 
this::enqueueEventToPrefetchingQueue);
+    return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
   }
 
   /////////////////////////////// stringify ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 5a7b2f7f31c..adef86518b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -231,7 +231,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
             new SubscriptionPipeTsFilePlainEvent((PipeTsFileInsertionEvent) 
event),
             ((PipeTsFileInsertionEvent) event).getTsFile(),
             commitContext);
-    super.enqueueEventToPrefetchingQueue(ev);
+    super.prefetchEvent(ev);
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
deleted file mode 100644
index cc73384d42c..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.event;
-
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-
-@FunctionalInterface
-public interface SubscriptionCommitContextSupplier {
-
-  SubscriptionCommitContext get();
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index 2db5bc7a4b0..acf47ca8a51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -41,13 +41,14 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
 import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
 
-public class SubscriptionEvent {
+public class SubscriptionEvent implements Comparable<SubscriptionEvent> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionEvent.class);
 
@@ -63,7 +64,10 @@ public class SubscriptionEvent {
   private final AtomicLong committedTimestamp = new 
AtomicLong(INVALID_TIMESTAMP);
 
   // record file name for file payload
-  private String fileName;
+  private volatile String fileName;
+
+  // record for cross-event dataset payload
+  private volatile SubscriptionCommitContext rootCommitContext;
 
   private static final long NACK_COUNT_REPORT_THRESHOLD = 3;
   private final AtomicLong nackCount = new AtomicLong();
@@ -86,18 +90,41 @@ public class SubscriptionEvent {
     this(response.getResponseType(), response.getPayload(), 
response.getCommitContext());
   }
 
+  /**
+   * Constructs a {@link SubscriptionEvent} with the response type of {@link
+   * SubscriptionEventTabletResponse}.
+   */
+  public SubscriptionEvent(
+      final SubscriptionPipeTabletEventBatch batch, final 
SubscriptionPrefetchingQueue queue) {
+    final SubscriptionPipeTabletBatchEvents events = new 
SubscriptionPipeTabletBatchEvents(batch);
+    final SubscriptionCommitContext commitContext = 
queue.generateSubscriptionCommitContext();
+
+    this.pipeEvents = events;
+    this.response =
+        new SubscriptionEventTabletResponse(batch, queue, events, 
commitContext, commitContext);
+    this.commitContext = commitContext;
+
+    // root event for cross-event dataset payload
+    this.rootCommitContext = commitContext;
+  }
+
   /**
    * Constructs a {@link SubscriptionEvent} with the response type of {@link
    * SubscriptionEventTabletResponse}.
    */
   public SubscriptionEvent(
       final SubscriptionPipeTabletEventBatch batch,
-      final SubscriptionCommitContextSupplier commitContextSupplier) {
-    this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch);
-    final SubscriptionCommitContext commitContext = 
commitContextSupplier.get();
+      final SubscriptionPrefetchingQueue queue,
+      final SubscriptionCommitContext rootCommitContext) {
+    final SubscriptionPipeTabletBatchEvents events = new 
SubscriptionPipeTabletBatchEvents(batch);
+    final SubscriptionCommitContext commitContext = 
queue.generateSubscriptionCommitContext();
+
+    this.pipeEvents = events;
     this.response =
-        new SubscriptionEventTabletResponse(batch, commitContext, 
commitContextSupplier);
+        new SubscriptionEventTabletResponse(batch, queue, events, 
commitContext, rootCommitContext);
     this.commitContext = commitContext;
+
+    this.rootCommitContext = rootCommitContext;
   }
 
   /**
@@ -145,12 +172,8 @@ public class SubscriptionEvent {
     return response.isCommittable();
   }
 
-  public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
-    // NOTE: we should ack pipe events before ack response since multiple 
events may reuse the same
-    // batch (as pipe events)
-    // TODO: consider more elegant design for this method
+  public void ack() {
     pipeEvents.ack();
-    response.ack(onCommittedHook);
   }
 
   /**
@@ -233,7 +256,7 @@ public class SubscriptionEvent {
 
   //////////////////////////// prefetch & fetch ////////////////////////////
 
-  public void prefetchRemainingResponses() throws Exception {
+  public void prefetchRemainingResponses() {
     response.prefetchRemainingResponses();
   }
 
@@ -281,16 +304,25 @@ public class SubscriptionEvent {
 
   @Override
   public String toString() {
-    return "SubscriptionEvent{response="
-        + response
-        + ", lastPolledConsumerId="
-        + lastPolledConsumerId
-        + ", lastPolledTimestamp="
-        + lastPolledTimestamp
-        + ", committedTimestamp="
-        + committedTimestamp
-        + ", pipeEvents="
-        + pipeEvents
-        + "}";
+    return toStringHelper(this)
+        .add("commitContext", commitContext)
+        .add("response", response)
+        .add("lastPolledConsumerId", lastPolledConsumerId)
+        .add("lastPolledTimestamp", lastPolledTimestamp)
+        .add("committedTimestamp", committedTimestamp)
+        .add("pipeEvents", pipeEvents)
+        .add(
+            "rootCommitContext",
+            Objects.nonNull(rootCommitContext) ? rootCommitContext : 
"<unknown>")
+        .toString();
+  }
+
+  @Override
+  public int compareTo(final SubscriptionEvent that) {
+    final SubscriptionCommitContext thisCommitContext =
+        Objects.nonNull(this.rootCommitContext) ? this.rootCommitContext : 
this.commitContext;
+    final SubscriptionCommitContext thatCommitContext =
+        Objects.nonNull(that.rootCommitContext) ? that.rootCommitContext : 
that.commitContext;
+    return thisCommitContext.compareTo(thatCommitContext);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index 58aac3714c1..5f2b0e069a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -30,12 +30,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 public abstract class SubscriptionPipeEventBatch {
 
@@ -116,33 +113,6 @@ public abstract class SubscriptionPipeEventBatch {
 
   protected abstract List<SubscriptionEvent> generateSubscriptionEvents() 
throws Exception;
 
-  /////////////////////////////// stringify ///////////////////////////////
-
-  protected Map<String, String> coreReportMessage() {
-    final Map<String, String> result = new HashMap<>();
-    result.put("regionId", String.valueOf(regionId));
-    result.put("prefetchingQueue", 
prefetchingQueue.coreReportMessage().toString());
-    result.put("maxDelayInMs", String.valueOf(maxDelayInMs));
-    result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes));
-    // omit subscription events here
-    result.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4));
-    return result;
-  }
-
-  private static String formatEnrichedEvents(
-      final List<EnrichedEvent> enrichedEvents, final int threshold) {
-    final List<String> eventMessageList =
-        enrichedEvents.stream()
-            .limit(threshold)
-            .map(EnrichedEvent::coreReportMessage)
-            .collect(Collectors.toList());
-    if (eventMessageList.size() > threshold) {
-      eventMessageList.add(
-          String.format("omit the remaining %s event(s)...", 
eventMessageList.size() - threshold));
-    }
-    return eventMessageList.toString();
-  }
-
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public int getPipeEventCount() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index d1d56e6c5a7..00114d85df4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -41,8 +41,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
@@ -61,7 +61,8 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   private final Meter insertNodeTabletInsertionEventSizeEstimator;
   private final Meter rawTabletInsertionEventSizeEstimator;
 
-  private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
+  private volatile List<EnrichedEvent> iteratedEnrichedEvents;
+  private final AtomicInteger referenceCount = new AtomicInteger();
 
   private static final long ITERATED_COUNT_REPORT_FREQ =
       30000; // based on the full parse of a 128MB tsfile estimate
@@ -78,35 +79,36 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
         new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
     this.rawTabletInsertionEventSizeEstimator =
         new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
+
+    resetForIteration();
   }
 
   /////////////////////////////// ack & clean ///////////////////////////////
 
   @Override
   public synchronized void ack() {
-    // only decrease the reference count of iterated events
-    for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
-      enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
-    }
-    iteratedEnrichedEvents.clear();
+    referenceCount.decrementAndGet();
+    // do nothing for iterated enriched events, see 
SubscriptionPipeTabletBatchEvents
   }
 
   @Override
   public synchronized void cleanUp() {
-    // do nothing if it has next
-    if (hasNext()) {
+    // do nothing if it has next or still referenced by unacked response
+    if (hasNext() || referenceCount.get() != 0) {
       return;
     }
 
     // clear the reference count of events
     for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+      if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+        // close data container in tsfile event
+        ((PipeTsFileInsertionEvent) enrichedEvent).close();
+      }
       enrichedEvent.clearReferenceCount(this.getClass().getName());
     }
     enrichedEvents.clear();
 
-    currentEnrichedEventsIterator = null;
-    currentTabletInsertionEventsIterator = null;
-    currentTsFileInsertionEvent = null;
+    resetForIteration();
   }
 
   /////////////////////////////// utility ///////////////////////////////
@@ -144,9 +146,8 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
 
   @Override
   protected List<SubscriptionEvent> generateSubscriptionEvents() {
-    resetIterator();
-    return Collections.singletonList(
-        new SubscriptionEvent(this, 
prefetchingQueue::generateSubscriptionCommitContext));
+    resetForIteration();
+    return Collections.singletonList(new SubscriptionEvent(this, 
prefetchingQueue));
   }
 
   @Override
@@ -199,12 +200,22 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
 
   /////////////////////////////// iterator ///////////////////////////////
 
-  public void resetIterator() {
+  public List<EnrichedEvent> sendIterationSnapshot() {
+    final List<EnrichedEvent> result = 
Collections.unmodifiableList(iteratedEnrichedEvents);
+    iteratedEnrichedEvents = new ArrayList<>();
+    referenceCount.incrementAndGet();
+    return result;
+  }
+
+  public void resetForIteration() {
     currentEnrichedEventsIterator = enrichedEvents.iterator();
     currentTabletInsertionEventsIterator = null;
     currentTsFileInsertionEvent = null;
 
-    iteratedEnrichedEvents.clear();
+    iteratedEnrichedEvents = new ArrayList<>();
+    referenceCount.set(0);
+
+    iteratedCount.set(0);
   }
 
   @Override
@@ -300,25 +311,4 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
       return null;
     }
   }
-
-  /////////////////////////////// stringify ///////////////////////////////
-
-  @Override
-  public String toString() {
-    return "SubscriptionPipeTabletEventBatch" + this.coreReportMessage();
-  }
-
-  @Override
-  protected Map<String, String> coreReportMessage() {
-    final Map<String, String> coreReportMessage = super.coreReportMessage();
-    coreReportMessage.put("firstEventProcessingTime", 
String.valueOf(firstEventProcessingTime));
-    coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize));
-    coreReportMessage.put(
-        "estimatedInsertNodeTabletInsertionEventSize",
-        String.valueOf(getEstimatedInsertNodeTabletInsertionEventSize()));
-    coreReportMessage.put(
-        "estimatedRawTabletInsertionEventSize",
-        String.valueOf(getEstimatedRawTabletInsertionEventSize()));
-    return coreReportMessage;
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 514795db23a..6148cb1a060 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch {
@@ -111,18 +110,4 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
   protected boolean shouldEmit() {
     return batch.shouldEmit();
   }
-
-  /////////////////////////////// stringify ///////////////////////////////
-
-  @Override
-  public String toString() {
-    return "SubscriptionPipeTsFileEventBatch" + this.coreReportMessage();
-  }
-
-  @Override
-  protected Map<String, String> coreReportMessage() {
-    final Map<String, String> coreReportMessage = super.coreReportMessage();
-    coreReportMessage.put("batch", batch.toString());
-    return coreReportMessage;
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
index d4eba2adffb..1a0be808c62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
@@ -57,6 +57,9 @@ public class CachedSubscriptionPollResponse extends 
SubscriptionPollResponse {
   public void invalidateByteBuffer() {
     // maybe friendly for gc
     byteBuffer = null;
+  }
+
+  public void closeMemoryBlock() {
     if (Objects.nonNull(memoryBlock)) {
       memoryBlock.close();
     }
@@ -88,6 +91,8 @@ public class CachedSubscriptionPollResponse extends 
SubscriptionPollResponse {
         Objects.isNull(byteBuffer)
             ? "<unknown>"
             : String.valueOf(byteBuffer.limit() - byteBuffer.position()));
+    coreReportMessage.put(
+        "memoryBlock", Objects.isNull(memoryBlock) ? "<unknown>" : 
memoryBlock.toString());
     return coreReportMessage;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index ef6cc39a60c..a2b9b3c3db9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.subscription.event.pipe;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
+
 public class SubscriptionPipeEmptyEvent implements SubscriptionPipeEvents {
 
   @Override
@@ -31,7 +33,7 @@ public class SubscriptionPipeEmptyEvent implements 
SubscriptionPipeEvents {
 
   @Override
   public String toString() {
-    return "SubscriptionEmptyPipeEvent";
+    return toStringHelper(this).toString();
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index f518c7b0194..87a69fb2be2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -19,19 +19,41 @@
 
 package org.apache.iotdb.db.subscription.event.pipe;
 
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
 
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
 public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents {
 
   private final SubscriptionPipeTabletEventBatch batch;
+  private volatile List<EnrichedEvent> iteratedEnrichedEvents;
 
   public SubscriptionPipeTabletBatchEvents(final 
SubscriptionPipeTabletEventBatch batch) {
     this.batch = batch;
   }
 
+  public void receiveIterationSnapshot(final List<EnrichedEvent> 
iteratedEnrichedEvents) {
+    this.iteratedEnrichedEvents = iteratedEnrichedEvents;
+  }
+
   @Override
   public void ack() {
     batch.ack();
+
+    // only decrease the reference count of iterated events
+    for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+      if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+        // close data container in tsfile event
+        ((PipeTsFileInsertionEvent) enrichedEvent).close();
+      }
+      enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+    }
   }
 
   @Override
@@ -43,13 +65,33 @@ public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents
 
   @Override
   public String toString() {
-    return "SubscriptionPipeTabletBatchEvents{batch=" + batch + "}";
+    return toStringHelper(this)
+        .add("batch", batch)
+        .add("events", formatEnrichedEvents(iteratedEnrichedEvents, 4))
+        .toString();
+  }
+
+  private static String formatEnrichedEvents(
+      final List<EnrichedEvent> enrichedEvents, final int threshold) {
+    if (Objects.isNull(enrichedEvents)) {
+      return "[]";
+    }
+    final List<String> eventMessageList =
+        enrichedEvents.stream()
+            .limit(threshold)
+            .map(EnrichedEvent::coreReportMessage)
+            .collect(Collectors.toList());
+    if (enrichedEvents.size() > threshold) {
+      eventMessageList.add(
+          String.format("omit the remaining %s event(s)...", 
enrichedEvents.size() - threshold));
+    }
+    return eventMessageList.toString();
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   @Override
   public int getPipeEventCount() {
-    return batch.getPipeEventCount();
+    return iteratedEnrichedEvents.size();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 16151a16a50..8b98f1b758f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -23,6 +23,8 @@ import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventB
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
+
 public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents {
 
   private final SubscriptionPipeTsFileEventBatch batch;
@@ -54,11 +56,7 @@ public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents
 
   @Override
   public String toString() {
-    return "SubscriptionPipeTsFileBatchEvents{batch="
-        + batch
-        + ", referenceCount="
-        + referenceCount
-        + "}";
+    return toStringHelper(this).add("batch", batch).add("count", 
count).toString();
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index 8effb654d73..453e44be965 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.subscription.event.pipe;
 
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
+
 public class SubscriptionPipeTsFilePlainEvent implements 
SubscriptionPipeEvents {
 
   private final PipeTsFileInsertionEvent tsFileInsertionEvent;
@@ -36,6 +38,8 @@ public class SubscriptionPipeTsFilePlainEvent implements 
SubscriptionPipeEvents
 
   @Override
   public void cleanUp() {
+    // close data container in tsfile event
+    tsFileInsertionEvent.close();
     // clear the reference count of event
     tsFileInsertionEvent.clearReferenceCount(this.getClass().getName());
   }
@@ -44,9 +48,7 @@ public class SubscriptionPipeTsFilePlainEvent implements 
SubscriptionPipeEvents
 
   @Override
   public String toString() {
-    return "SubscriptionPipeTsFilePlainEvent{tsFileInsertionEvent="
-        + tsFileInsertionEvent.coreReportMessage()
-        + "}";
+    return toStringHelper(this).add("event", tsFileInsertionEvent).toString();
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
index dba2f48b466..772de6bf36c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
@@ -79,6 +79,7 @@ public abstract class SubscriptionEventExtendableResponse
   public void cleanUp() {
     CachedSubscriptionPollResponse response;
     while (Objects.nonNull(response = poll())) {
+      response.closeMemoryBlock();
       SubscriptionPollResponseCache.getInstance().invalidate(response);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
index ed6ad138276..c1702876e17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
@@ -19,11 +19,8 @@
 
 package org.apache.iotdb.db.subscription.event.response;
 
-import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.function.Consumer;
 
 public interface SubscriptionEventResponse<E> {
 
@@ -31,7 +28,8 @@ public interface SubscriptionEventResponse<E> {
 
   E getCurrentResponse();
 
-  void prefetchRemainingResponses() throws Exception;
+  @Deprecated // TBD.
+  void prefetchRemainingResponses();
 
   void fetchNextResponse(final long offset) throws Exception;
 
@@ -47,10 +45,6 @@ public interface SubscriptionEventResponse<E> {
 
   /////////////////////////////// lifecycle ///////////////////////////////
 
-  default void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
-    // do nothing
-  }
-
   void nack();
 
   void cleanUp();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
index 7c72e65b6d6..aca492125fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
@@ -96,6 +96,7 @@ public class SubscriptionEventSingleResponse
 
   @Override
   public void cleanUp() {
+    response.closeMemoryBlock();
     invalidateCurrentResponseByteBuffer();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index 35591ed405e..cd93cd350ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -19,12 +19,18 @@
 
 package org.apache.iotdb.db.subscription.event.response;
 
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
-import 
org.apache.iotdb.db.subscription.event.SubscriptionCommitContextSupplier;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
+import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
 import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
@@ -36,9 +42,9 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 
 /**
  * The {@code SubscriptionEventTabletResponse} class extends {@link
@@ -58,8 +64,11 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
       
SubscriptionConfig.getInstance().getSubscriptionPrefetchTabletBatchMaxSizeInBytes();
 
   private final SubscriptionPipeTabletEventBatch batch;
+  private final SubscriptionPrefetchingQueue queue;
+  private final SubscriptionPipeTabletBatchEvents events;
+
   private final SubscriptionCommitContext commitContext;
-  private final SubscriptionCommitContextSupplier commitContextSupplier;
+  private final SubscriptionCommitContext rootCommitContext;
 
   private volatile int totalTablets;
   private final AtomicInteger nextOffset = new AtomicInteger(0);
@@ -70,11 +79,16 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
 
   public SubscriptionEventTabletResponse(
       final SubscriptionPipeTabletEventBatch batch,
+      final SubscriptionPrefetchingQueue queue,
+      final SubscriptionPipeTabletBatchEvents events,
       final SubscriptionCommitContext commitContext,
-      final SubscriptionCommitContextSupplier commitContextSupplier) {
+      final SubscriptionCommitContext rootCommitContext) {
     this.batch = batch;
+    this.queue = queue;
+    this.events = events;
+
     this.commitContext = commitContext;
-    this.commitContextSupplier = commitContextSupplier;
+    this.rootCommitContext = rootCommitContext;
 
     init();
   }
@@ -85,20 +99,18 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
   }
 
   @Override
-  public void fetchNextResponse(final long offset /* unused */) {
+  public void fetchNextResponse(final long offset /* unused */) throws 
Exception {
+    // generate and offer next response
     offer(generateNextTabletResponse());
-    if (Objects.isNull(poll())) {
+
+    // poll and clean previous response
+    final CachedSubscriptionPollResponse previousResponse;
+    if (Objects.isNull(previousResponse = poll())) {
       LOGGER.warn(
           "SubscriptionEventTabletResponse {} is empty when fetching next 
response (broken invariant)",
           this);
-    }
-  }
-
-  @Override
-  public synchronized void ack(final Consumer<SubscriptionEvent> 
onCommittedHook) {
-    if (availableForNext) {
-      // generate next subscription event with the same batch
-      onCommittedHook.accept(new SubscriptionEvent(batch, 
commitContextSupplier));
+    } else {
+      previousResponse.closeMemoryBlock();
     }
   }
 
@@ -113,7 +125,7 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
 
     // should not reset the iterator of batch when init
     // TODO: avoid completely rewinding the iterator
-    batch.resetIterator();
+    batch.resetForIteration();
     init();
   }
 
@@ -143,20 +155,37 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
       return;
     }
 
-    offer(generateNextTabletResponse());
+    offer(generateEmptyTabletResponse());
   }
 
-  private synchronized CachedSubscriptionPollResponse 
generateNextTabletResponse() {
+  private synchronized CachedSubscriptionPollResponse 
generateEmptyTabletResponse() {
+    return new CachedSubscriptionPollResponse(
+        SubscriptionPollResponseType.TABLETS.getType(),
+        new TabletsPayload(Collections.emptyList(), 
nextOffset.incrementAndGet()),
+        commitContext);
+  }
+
+  private synchronized CachedSubscriptionPollResponse 
generateNextTabletResponse()
+      throws InterruptedException, PipeRuntimeOutOfMemoryCriticalException {
     if (availableForNext) {
+      // generate next subscription event with the same batch
+      queue.prefetchEvent(new SubscriptionEvent(batch, queue, 
rootCommitContext));
+      // frozen iterated enriched events
+      transportIterationSnapshot();
+      // return last response of this subscription event
       return new CachedSubscriptionPollResponse(
           SubscriptionPollResponseType.TABLETS.getType(),
           new TabletsPayload(Collections.emptyList(), -totalTablets),
           commitContext);
     }
 
+    CachedSubscriptionPollResponse response = null;
     final List<Tablet> currentTablets = new ArrayList<>();
     long currentBufferSize = 0;
 
+    // TODO: TBD.
+    // 
waitForResourceEnough4Parsing(SubscriptionAgent.receiver().remainingMs());
+
     while (batch.hasNext()) {
       final List<Tablet> tablets = batch.next();
       if (Objects.isNull(tablets)) {
@@ -171,49 +200,133 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
               .orElse(0L);
       totalTablets += tablets.size();
       totalBufferSize += bufferSize;
+      currentBufferSize += bufferSize;
 
       if (bufferSize > READ_TABLET_BUFFER_SIZE) {
         // TODO: split tablets
-        LOGGER.warn("Detect large tablets with {} byte(s).", bufferSize);
-        return new CachedSubscriptionPollResponse(
-            SubscriptionPollResponseType.TABLETS.getType(),
-            new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
-            commitContext);
+        LOGGER.warn(
+            "Detect large tablets with {} byte(s), current tablets size {} 
byte(s)",
+            bufferSize,
+            currentTablets);
+        response =
+            new CachedSubscriptionPollResponse(
+                SubscriptionPollResponseType.TABLETS.getType(),
+                new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
+                commitContext);
+        break;
       }
 
-      if (currentBufferSize + bufferSize > READ_TABLET_BUFFER_SIZE) {
+      if (currentBufferSize > READ_TABLET_BUFFER_SIZE) {
         // TODO: split tablets
-        return new CachedSubscriptionPollResponse(
-            SubscriptionPollResponseType.TABLETS.getType(),
-            new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
-            commitContext);
+        response =
+            new CachedSubscriptionPollResponse(
+                SubscriptionPollResponseType.TABLETS.getType(),
+                new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
+                commitContext);
+        break;
       }
 
-      currentBufferSize += bufferSize;
-
       // limit control for large message
       if (totalBufferSize > PREFETCH_TABLET_BUFFER_SIZE && batch.hasNext()) {
+        // we generate seal signal at next round
         availableForNext = true;
         break;
       }
     }
 
-    final CachedSubscriptionPollResponse response;
-    if (currentTablets.isEmpty()) {
-      response =
-          new CachedSubscriptionPollResponse(
-              SubscriptionPollResponseType.TABLETS.getType(),
-              new TabletsPayload(Collections.emptyList(), -totalTablets),
-              commitContext);
-      hasNoMore = true;
-    } else {
-      response =
-          new CachedSubscriptionPollResponse(
-              SubscriptionPollResponseType.TABLETS.getType(),
-              new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
-              commitContext);
+    if (Objects.isNull(response)) {
+      if (currentTablets.isEmpty()) {
+        // frozen iterated enriched events
+        transportIterationSnapshot();
+        // return last response of this subscription event
+        response =
+            new CachedSubscriptionPollResponse(
+                SubscriptionPollResponseType.TABLETS.getType(),
+                new TabletsPayload(Collections.emptyList(), -totalTablets),
+                commitContext);
+        hasNoMore = true;
+      } else {
+        response =
+            new CachedSubscriptionPollResponse(
+                SubscriptionPollResponseType.TABLETS.getType(),
+                new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
+                commitContext);
+      }
+    }
+
+    // set fixed memory block for response
+    final List<Tablet> tablets = ((TabletsPayload) 
response.getPayload()).getTablets();
+    if (Objects.nonNull(tablets) && !tablets.isEmpty()) {
+      final PipeTabletMemoryBlock memoryBlock =
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(currentBufferSize);
+      response.setMemoryBlock(memoryBlock);
     }
 
     return response;
   }
+
+  private void waitForResourceEnough4Parsing(final long timeoutMs) throws 
InterruptedException {
+    final PipeMemoryManager memoryManager = 
PipeDataNodeResourceManager.memory();
+    if (memoryManager.isEnough4TabletParsing()) {
+      return;
+    }
+
+    final long startTime = System.currentTimeMillis();
+    long lastRecordTime = startTime;
+
+    final long memoryCheckIntervalMs =
+        
SubscriptionConfig.getInstance().getSubscriptionCheckMemoryEnoughIntervalMs();
+    while (!memoryManager.isEnough4TabletParsing()) {
+      Thread.sleep(memoryCheckIntervalMs);
+
+      final long currentTime = System.currentTimeMillis();
+      final double elapsedRecordTimeSeconds = (currentTime - lastRecordTime) / 
1000.0;
+      final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
+      if (elapsedRecordTimeSeconds > 10.0) {
+        LOGGER.info(
+            "SubscriptionEventTabletResponse {} wait for resource enough for 
parsing tablets {} seconds.",
+            commitContext,
+            waitTimeSeconds);
+        lastRecordTime = currentTime;
+      } else if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "SubscriptionEventTabletResponse {} wait for resource enough for 
parsing tablets {} seconds.",
+            commitContext,
+            waitTimeSeconds);
+      }
+
+      if (waitTimeSeconds * 1000 > timeoutMs) {
+        // should contain 'TimeoutException' in exception message
+        throw new PipeException(
+            String.format("TimeoutException: Waited %s seconds", 
waitTimeSeconds));
+      }
+    }
+
+    final long currentTime = System.currentTimeMillis();
+    final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
+    LOGGER.info(
+        "SubscriptionEventTabletResponse {} wait for resource enough for 
parsing tablets {} seconds.",
+        commitContext,
+        waitTimeSeconds);
+  }
+
+  private void transportIterationSnapshot() {
+    events.receiveIterationSnapshot(batch.sendIterationSnapshot());
+  }
+
+  /////////////////////////////// stringify ///////////////////////////////
+
+  @Override
+  public String toString() {
+    return "SubscriptionEventTabletResponse" + coreReportMessage();
+  }
+
+  protected Map<String, String> coreReportMessage() {
+    final Map<String, String> result = super.coreReportMessage();
+    result.put("totalTablets", String.valueOf(totalTablets));
+    result.put("nextOffset", String.valueOf(nextOffset));
+    result.put("totalBufferSize", String.valueOf(totalBufferSize));
+    result.put("availableForNext", String.valueOf(availableForNext));
+    return result;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
index 2397ec51c60..0788a89c661 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.subscription.event.response;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -56,6 +57,9 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionEventTsFileResponse.class);
 
+  private static final long READ_FILE_BUFFER_SIZE =
+      SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
+
   private final File tsFile;
   private final SubscriptionCommitContext commitContext;
 
@@ -151,7 +155,8 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
   }
 
   private @NonNull CachedSubscriptionPollResponse 
generateResponseWithPieceOrSealPayload(
-      final long writingOffset) throws SubscriptionException, IOException, 
InterruptedException {
+      final long writingOffset)
+      throws IOException, InterruptedException, 
PipeRuntimeOutOfMemoryCriticalException {
     final long tsFileLength = tsFile.length();
     if (writingOffset >= tsFileLength) {
       // generate subscription poll response with seal payload
@@ -162,15 +167,13 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
           commitContext);
     }
 
-    final long readFileBufferSize =
-        SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
     final long bufferSize;
-    if (writingOffset + readFileBufferSize >= tsFileLength) {
+    if (writingOffset + READ_FILE_BUFFER_SIZE >= tsFileLength) {
       // last piece
       bufferSize = tsFileLength - writingOffset;
     } else {
       // not last piece
-      bufferSize = readFileBufferSize;
+      bufferSize = READ_FILE_BUFFER_SIZE;
     }
 
     waitForResourceEnough4Slicing(SubscriptionAgent.receiver().remainingMs());
@@ -196,6 +199,8 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
               SubscriptionPollResponseType.FILE_PIECE.getType(),
               new FilePiecePayload(tsFile.getName(), writingOffset + 
readLength, readBuffer),
               commitContext);
+
+      // set fixed memory block for response
       response.setMemoryBlock(memoryBlock);
       return response;
     }
@@ -211,7 +216,7 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
     long lastRecordTime = startTime;
 
     final long memoryCheckIntervalMs =
-        
PipeConfig.getInstance().getPipeTsFileParserCheckMemoryEnoughIntervalMs();
+        
SubscriptionConfig.getInstance().getSubscriptionCheckMemoryEnoughIntervalMs();
     while (!memoryManager.isEnough4TsFileSlicing()) {
       Thread.sleep(memoryCheckIntervalMs);
 
@@ -244,4 +249,17 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
     LOGGER.info(
         "Wait for resource enough for slicing tsfile {} for {} seconds.", 
tsFile, waitTimeSeconds);
   }
+
+  /////////////////////////////// stringify ///////////////////////////////
+
+  @Override
+  public String toString() {
+    return "SubscriptionEventTsFileResponse" + coreReportMessage();
+  }
+
+  protected Map<String, String> coreReportMessage() {
+    final Map<String, String> result = super.coreReportMessage();
+    result.put("tsFile", String.valueOf(tsFile));
+    return result;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 50db0de3021..b2eeaf25f2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -287,13 +287,12 @@ public class CommonConfig {
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
   private long twoStageAggregateSenderEndPointsCacheInMs = 3 * 60 * 1000L; // 
3 minutes
 
-  private float subscriptionCacheMemoryUsagePercentage = 0.2F;
-
   private boolean pipeEventReferenceTrackingEnabled = true;
   private long pipeEventReferenceEliminateIntervalSeconds = 10;
 
-  private int subscriptionSubtaskExecutorMaxThreadNum =
-      Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+  private float subscriptionCacheMemoryUsagePercentage = 0.2F;
+  private int subscriptionSubtaskExecutorMaxThreadNum = 2;
+
   private int subscriptionPrefetchTabletBatchMaxDelayInMs = 1000; // 1s
   private long subscriptionPrefetchTabletBatchMaxSizeInBytes = 16 * MB;
   private int subscriptionPrefetchTsFileBatchMaxDelayInMs = 5000; // 5s
@@ -305,7 +304,13 @@ public class CommonConfig {
   private long subscriptionReadFileBufferSize = 8 * MB;
   private long subscriptionReadTabletBufferSize = 8 * MB;
   private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
-  private volatile long subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs = 
10L;
+  private volatile long subscriptionCheckMemoryEnoughIntervalMs = 10L;
+
+  private boolean subscriptionPrefetchEnabled = true;
+  private float subscriptionPrefetchMemoryThreshold = 0.5F;
+  private float subscriptionPrefetchMissingRateThreshold = 0.9F;
+  private int subscriptionPrefetchEventLocalCountThreshold = 10;
+  private int subscriptionPrefetchEventGlobalCountThreshold = 100;
 
   private long subscriptionMetaSyncerInitialSyncDelayMinutes = 3;
   private long subscriptionMetaSyncerSyncIntervalMinutes = 3;
@@ -1309,10 +1314,7 @@ public class CommonConfig {
 
   public void setSubscriptionSubtaskExecutorMaxThreadNum(
       int subscriptionSubtaskExecutorMaxThreadNum) {
-    this.subscriptionSubtaskExecutorMaxThreadNum =
-        Math.min(
-            subscriptionSubtaskExecutorMaxThreadNum,
-            Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+    this.subscriptionSubtaskExecutorMaxThreadNum = 
subscriptionSubtaskExecutorMaxThreadNum;
   }
 
   public int getSubscriptionPrefetchTabletBatchMaxDelayInMs() {
@@ -1413,14 +1415,58 @@ public class CommonConfig {
         subscriptionTsFileDeduplicationWindowSeconds;
   }
 
-  public long getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs() {
-    return subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs;
+  public long getSubscriptionCheckMemoryEnoughIntervalMs() {
+    return subscriptionCheckMemoryEnoughIntervalMs;
+  }
+
+  public void setSubscriptionCheckMemoryEnoughIntervalMs(
+      long subscriptionCheckMemoryEnoughIntervalMs) {
+    this.subscriptionCheckMemoryEnoughIntervalMs = 
subscriptionCheckMemoryEnoughIntervalMs;
+  }
+
+  public boolean getSubscriptionPrefetchEnabled() {
+    return subscriptionPrefetchEnabled;
+  }
+
+  public void setSubscriptionPrefetchEnabled(boolean 
subscriptionPrefetchEnabled) {
+    this.subscriptionPrefetchEnabled = subscriptionPrefetchEnabled;
+  }
+
+  public float getSubscriptionPrefetchMemoryThreshold() {
+    return subscriptionPrefetchMemoryThreshold;
+  }
+
+  public void setSubscriptionPrefetchMemoryThreshold(float 
subscriptionPrefetchMemoryThreshold) {
+    this.subscriptionPrefetchMemoryThreshold = 
subscriptionPrefetchMemoryThreshold;
+  }
+
+  public float getSubscriptionPrefetchMissingRateThreshold() {
+    return subscriptionPrefetchMissingRateThreshold;
+  }
+
+  public void setSubscriptionPrefetchMissingRateThreshold(
+      float subscriptionPrefetchMissingRateThreshold) {
+    this.subscriptionPrefetchMissingRateThreshold = 
subscriptionPrefetchMissingRateThreshold;
+  }
+
+  public int getSubscriptionPrefetchEventLocalCountThreshold() {
+    return subscriptionPrefetchEventLocalCountThreshold;
+  }
+
+  public void setSubscriptionPrefetchEventLocalCountThreshold(
+      int subscriptionPrefetchEventLocalCountThreshold) {
+    this.subscriptionPrefetchEventLocalCountThreshold =
+        subscriptionPrefetchEventLocalCountThreshold;
+  }
+
+  public int getSubscriptionPrefetchEventGlobalCountThreshold() {
+    return subscriptionPrefetchEventGlobalCountThreshold;
   }
 
-  public void setSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs(
-      long subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs) {
-    this.subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs =
-        subscriptionTsFileSlicerCheckMemoryEnoughIntervalMs;
+  public void setSubscriptionPrefetchEventGlobalCountThreshold(
+      int subscriptionPrefetchEventGlobalCountThreshold) {
+    this.subscriptionPrefetchEventGlobalCountThreshold =
+        subscriptionPrefetchEventGlobalCountThreshold;
   }
 
   public long getSubscriptionMetaSyncerInitialSyncDelayMinutes() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c40429567c2..529d1053c42 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -655,15 +655,12 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_cache_memory_usage_percentage",
                 
String.valueOf(config.getSubscriptionCacheMemoryUsagePercentage()))));
-
     config.setSubscriptionSubtaskExecutorMaxThreadNum(
         Integer.parseInt(
             properties.getProperty(
                 "subscription_subtask_executor_max_thread_num",
                 
Integer.toString(config.getSubscriptionSubtaskExecutorMaxThreadNum()))));
-    if (config.getSubscriptionSubtaskExecutorMaxThreadNum() <= 0) {
-      config.setSubscriptionSubtaskExecutorMaxThreadNum(5);
-    }
+
     config.setSubscriptionPrefetchTabletBatchMaxDelayInMs(
         Integer.parseInt(
             properties.getProperty(
@@ -719,11 +716,37 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_ts_file_deduplication_window_seconds",
                 
String.valueOf(config.getSubscriptionTsFileDeduplicationWindowSeconds()))));
-    config.setSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs(
+    config.setSubscriptionCheckMemoryEnoughIntervalMs(
         Long.parseLong(
             properties.getProperty(
-                "subscription_ts_file_slicer_check_memory_enough_interval_ms",
-                
String.valueOf(config.getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs()))));
+                "subscription_check_memory_enough_interval_ms",
+                
String.valueOf(config.getSubscriptionCheckMemoryEnoughIntervalMs()))));
+
+    config.setSubscriptionPrefetchEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "subscription_prefetch_enabled",
+                String.valueOf(config.getSubscriptionPrefetchEnabled()))));
+    config.setSubscriptionPrefetchMemoryThreshold(
+        Float.parseFloat(
+            properties.getProperty(
+                "subscription_prefetch_memory_threshold",
+                
String.valueOf(config.getSubscriptionPrefetchMemoryThreshold()))));
+    config.setSubscriptionPrefetchMissingRateThreshold(
+        Float.parseFloat(
+            properties.getProperty(
+                "subscription_prefetch_missing_rate_threshold",
+                
String.valueOf(config.getSubscriptionPrefetchMemoryThreshold()))));
+    config.setSubscriptionPrefetchEventLocalCountThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "subscription_prefetch_event_local_count_threshold",
+                
String.valueOf(config.getSubscriptionPrefetchEventLocalCountThreshold()))));
+    config.setSubscriptionPrefetchEventGlobalCountThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "subscription_prefetch_event_global_count_threshold",
+                
String.valueOf(config.getSubscriptionPrefetchEventGlobalCountThreshold()))));
 
     config.setSubscriptionMetaSyncerInitialSyncDelayMinutes(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index fb1f37eae8d..76e29d77ee0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -81,6 +81,30 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionTsFileDeduplicationWindowSeconds();
   }
 
+  public long getSubscriptionCheckMemoryEnoughIntervalMs() {
+    return COMMON_CONFIG.getSubscriptionCheckMemoryEnoughIntervalMs();
+  }
+
+  public boolean getSubscriptionPrefetchEnabled() {
+    return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
+  }
+
+  public float getSubscriptionPrefetchMemoryThreshold() {
+    return COMMON_CONFIG.getSubscriptionPrefetchMemoryThreshold();
+  }
+
+  public float getSubscriptionPrefetchMissingRateThreshold() {
+    return COMMON_CONFIG.getSubscriptionPrefetchMissingRateThreshold();
+  }
+
+  public int getSubscriptionPrefetchEventLocalCountThreshold() {
+    return COMMON_CONFIG.getSubscriptionPrefetchEventLocalCountThreshold();
+  }
+
+  public int getSubscriptionPrefetchEventGlobalCountThreshold() {
+    return COMMON_CONFIG.getSubscriptionPrefetchEventGlobalCountThreshold();
+  }
+
   public long getSubscriptionMetaSyncerInitialSyncDelayMinutes() {
     return COMMON_CONFIG.getSubscriptionMetaSyncerInitialSyncDelayMinutes();
   }
@@ -89,10 +113,6 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionMetaSyncerSyncIntervalMinutes();
   }
 
-  public long getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs() {
-    return 
COMMON_CONFIG.getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs();
-  }
-
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConfig.class);
@@ -100,10 +120,10 @@ public class SubscriptionConfig {
   public void printAllConfigs() {
     LOGGER.info(
         "SubscriptionCacheMemoryUsagePercentage: {}", 
getSubscriptionCacheMemoryUsagePercentage());
-
     LOGGER.info(
         "SubscriptionSubtaskExecutorMaxThreadNum: {}",
         getSubscriptionSubtaskExecutorMaxThreadNum());
+
     LOGGER.info(
         "SubscriptionPrefetchTabletBatchMaxDelayInMs: {}",
         getSubscriptionPrefetchTabletBatchMaxDelayInMs());
@@ -128,8 +148,21 @@ public class SubscriptionConfig {
         "SubscriptionTsFileDeduplicationWindowSeconds: {}",
         getSubscriptionTsFileDeduplicationWindowSeconds());
     LOGGER.info(
-        "SubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs: {}",
-        getSubscriptionTsFileSlicerCheckMemoryEnoughIntervalMs());
+        "SubscriptionCheckMemoryEnoughIntervalMs: {}",
+        getSubscriptionCheckMemoryEnoughIntervalMs());
+
+    LOGGER.info("SubscriptionPrefetchEnabled: {}", 
getSubscriptionPrefetchEnabled());
+    LOGGER.info(
+        "SubscriptionPrefetchMemoryThreshold: {}", 
getSubscriptionPrefetchMemoryThreshold());
+    LOGGER.info(
+        "SubscriptionPrefetchMissingRateThreshold: {}",
+        getSubscriptionPrefetchMissingRateThreshold());
+    LOGGER.info(
+        "SubscriptionPrefetchEventLocalCountThreshold: {}",
+        getSubscriptionPrefetchEventLocalCountThreshold());
+    LOGGER.info(
+        "SubscriptionPrefetchEventGlobalCountThreshold: {}",
+        getSubscriptionPrefetchEventGlobalCountThreshold());
 
     LOGGER.info(
         "SubscriptionMetaSyncerInitialSyncDelayMinutes: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 3e409a9160e..c73f08766f5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -196,6 +196,8 @@ public class TopicMeta {
     extractorAttributes.putAll(config.getAttributesWithSourceMode());
     // loose range
     extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
+    // backdoor configs
+    extractorAttributes.putAll(config.getAttributesWithSourcePrefix());
     return extractorAttributes;
   }
 
@@ -209,6 +211,8 @@ public class TopicMeta {
     connectorAttributes.put(PipeConnectorConstant.SINK_TOPIC_KEY, topicName);
     connectorAttributes.put(PipeConnectorConstant.SINK_CONSUMER_GROUP_KEY, 
consumerGroupId);
     connectorAttributes.putAll(config.getAttributesWithSinkFormat());
+    // backdoor configs
+    connectorAttributes.putAll(config.getAttributesWithSinkPrefix());
     return connectorAttributes;
   }
 

Reply via email to