This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new f8adc22119d Subscription: implement subscription event optimistic 
transmission strategy to reduce peak memory usage (#13763) (#13928)
f8adc22119d is described below

commit f8adc22119de41663011db584907b8662e65e94b
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Oct 29 00:06:45 2024 +0800

    Subscription: implement subscription event optimistic transmission strategy 
to reduce peak memory usage (#13763) (#13928)
---
 .../iotdb/rpc/subscription/config/TopicConfig.java |   7 +-
 .../payload/poll/SubscriptionPollResponse.java     |  20 +-
 .../db/subscription/broker/SubscriptionBroker.java |  20 +-
 .../broker/SubscriptionPrefetchingQueue.java       |  21 +-
 .../broker/SubscriptionPrefetchingTabletQueue.java |   7 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |   6 +-
 .../db/subscription/event/SubscriptionEvent.java   | 298 +++++----------------
 .../event/batch/SubscriptionPipeEventBatch.java    |  84 +++++-
 .../batch/SubscriptionPipeTabletEventBatch.java    | 157 ++++-------
 .../batch/SubscriptionPipeTsFileEventBatch.java    |  81 +++---
 .../cache/CachedSubscriptionPollResponse.java      |  83 ++++++
 .../SubscriptionPollResponseCache.java}            |  43 +--
 .../event/pipe/SubscriptionPipeEmptyEvent.java     |   7 -
 .../event/pipe/SubscriptionPipeEvents.java         |   7 -
 .../pipe/SubscriptionPipeTabletBatchEvents.java    |   7 -
 .../pipe/SubscriptionPipeTsFileBatchEvents.java    |  14 +-
 .../pipe/SubscriptionPipeTsFilePlainEvent.java     |   7 -
 .../SubscriptionEventExtendableResponse.java       | 151 +++++++++++
 .../SubscriptionEventResponse.java}                |  36 ++-
 .../response/SubscriptionEventSingleResponse.java  | 122 +++++++++
 .../response/SubscriptionEventTabletResponse.java  | 162 +++++++++++
 .../response/SubscriptionEventTsFileResponse.java  | 170 ++++++++++++
 .../receiver/SubscriptionReceiverV1.java           |   2 +-
 .../db/subscription/SubscriptionStatesTest.java    |   5 -
 24 files changed, 997 insertions(+), 520 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 4bec8d762a2..46dc7601e0f 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -106,7 +106,7 @@ public class TopicConfig extends PipeParameters {
   }
 
   public Map<String, String> getAttributesWithRealtimeMode() {
-    return REALTIME_STREAM_MODE_CONFIG;
+    return REALTIME_STREAM_MODE_CONFIG; // default to stream (hybrid)
   }
 
   public Map<String, String> getAttributesWithSourceMode() {
@@ -136,9 +136,6 @@ public class TopicConfig extends PipeParameters {
   }
 
   public Map<String, String> getAttributesWithSinkFormat() {
-    return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(
-            attributes.getOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE))
-        ? Collections.emptyMap()
-        : SINK_TABLET_FORMAT_CONFIG;
+    return Collections.emptyMap(); // default to hybrid
   }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
index 01d173d2742..06baa30acee 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SubscriptionPollResponse {
 
@@ -110,16 +112,18 @@ public class SubscriptionPollResponse {
     return new SubscriptionPollResponse(responseType, payload, commitContext);
   }
 
-  /////////////////////////////// object ///////////////////////////////
+  /////////////////////////////// stringify ///////////////////////////////
 
   @Override
   public String toString() {
-    return "SubscriptionPollResponse{responseType="
-        + SubscriptionPollResponseType.valueOf(responseType).toString()
-        + ", payload="
-        + payload
-        + ", commitContext="
-        + commitContext
-        + "}";
+    return "SubscriptionPollResponse" + coreReportMessage();
+  }
+
+  protected Map<String, String> coreReportMessage() {
+    final Map<String, String> result = new HashMap<>();
+    result.put("responseType", 
SubscriptionPollResponseType.valueOf(responseType).toString());
+    result.put("payload", payload.toString());
+    result.put("commitContext", commitContext.toString());
+    return result;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index afc8f2f2290..c4e47dec150 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -24,13 +24,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
 import 
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
 
@@ -103,16 +101,14 @@ public class SubscriptionBroker {
               brokerId);
           events.add(
               new SubscriptionEvent(
-                  new SubscriptionPipeEmptyEvent(),
-                  new SubscriptionPollResponse(
-                      SubscriptionPollResponseType.TERMINATION.getType(),
-                      new TerminationPayload(),
-                      new SubscriptionCommitContext(
-                          
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
-                          PipeDataNodeAgent.runtime().getRebootTimes(),
-                          topicName,
-                          brokerId,
-                          INVALID_COMMIT_ID))));
+                  SubscriptionPollResponseType.TERMINATION.getType(),
+                  new TerminationPayload(),
+                  new SubscriptionCommitContext(
+                      
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+                      PipeDataNodeAgent.runtime().getRebootTimes(),
+                      topicName,
+                      brokerId,
+                      INVALID_COMMIT_ID)));
           continue;
         }
         // There are two reasons for not printing logs here:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e072b7a8d51..9aa9343083d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -28,13 +28,11 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 
 import com.google.common.collect.ImmutableSet;
@@ -288,6 +286,7 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   protected void enqueueEventToPrefetchingQueue(final SubscriptionEvent event) 
{
+    // TODO: consider memory usage
     event.trySerializeCurrentResponse();
     prefetchingQueue.add(event);
   }
@@ -532,16 +531,14 @@ public abstract class SubscriptionPrefetchingQueue {
   protected SubscriptionEvent generateSubscriptionPollErrorResponse(final 
String errorMessage) {
     // consider non-critical by default, meaning the client can retry
     return new SubscriptionEvent(
-        new SubscriptionPipeEmptyEvent(),
-        new SubscriptionPollResponse(
-            SubscriptionPollResponseType.ERROR.getType(),
-            new ErrorPayload(errorMessage, false),
-            new SubscriptionCommitContext(
-                IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
-                PipeDataNodeAgent.runtime().getRebootTimes(),
-                topicName,
-                brokerId,
-                INVALID_COMMIT_ID)));
+        SubscriptionPollResponseType.ERROR.getType(),
+        new ErrorPayload(errorMessage, false),
+        new SubscriptionCommitContext(
+            IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+            PipeDataNodeAgent.runtime().getRebootTimes(),
+            topicName,
+            brokerId,
+            INVALID_COMMIT_ID));
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index ea7a652e461..09f41cf23cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -162,11 +163,7 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
 
   @Override
   protected boolean onEvent(final TsFileInsertionEvent event) {
-    LOGGER.warn(
-        "Subscription: SubscriptionPrefetchingTabletQueue {} ignore 
TsFileInsertionEvent {} when prefetching.",
-        this,
-        event);
-    return false;
+    return batches.onEvent((EnrichedEvent) event, 
this::enqueueEventToPrefetchingQueue);
   }
 
   /////////////////////////////// stringify ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 64daaca4b4c..5d12eb490cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -240,10 +240,8 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     final SubscriptionEvent ev =
         new SubscriptionEvent(
             new SubscriptionPipeTsFilePlainEvent((PipeTsFileInsertionEvent) 
event),
-            new SubscriptionPollResponse(
-                SubscriptionPollResponseType.FILE_INIT.getType(),
-                new FileInitPayload(((PipeTsFileInsertionEvent) 
event).getTsFile().getName()),
-                commitContext));
+            ((PipeTsFileInsertionEvent) event).getTsFile(),
+            commitContext);
     super.enqueueEventToPrefetchingQueue(ev);
     return true;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index ad274023f6a..0e7fd04d009 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -20,34 +20,29 @@
 package org.apache.iotdb.db.subscription.event;
 
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
+import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
+import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
 import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEvents;
-import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload;
+import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
+import 
org.apache.iotdb.db.subscription.event.response.SubscriptionEventResponse;
+import 
org.apache.iotdb.db.subscription.event.response.SubscriptionEventSingleResponse;
+import 
org.apache.iotdb.db.subscription.event.response.SubscriptionEventTabletResponse;
+import 
org.apache.iotdb.db.subscription.event.response.SubscriptionEventTsFileResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
-import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
 
-import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
 
@@ -58,81 +53,63 @@ public class SubscriptionEvent {
   private static final long INVALID_TIMESTAMP = -1;
 
   private final SubscriptionPipeEvents pipeEvents;
-
-  private final SubscriptionPollResponse[] responses;
-  private int currentResponseIndex = 0;
-
-  private final ByteBuffer[] byteBuffers; // serialized responses
-  private final SubscriptionCommitContext
-      commitContext; // all responses have the same commit context
+  private final SubscriptionEventResponse<CachedSubscriptionPollResponse> 
response;
+  private final SubscriptionCommitContext commitContext;
 
   // lastPolledConsumerId is not used as a criterion for determining 
pollability
   private volatile String lastPolledConsumerId = null;
   private final AtomicLong lastPolledTimestamp = new 
AtomicLong(INVALID_TIMESTAMP);
   private final AtomicLong committedTimestamp = new 
AtomicLong(INVALID_TIMESTAMP);
 
+  // record file name for file payload
+  private String fileName;
+
   /**
-   * Constructs a {@link SubscriptionEvent} with an initial response.
-   *
-   * @param pipeEvents The underlying pipe events corresponding to this {@link 
SubscriptionEvent}.
-   * @param initialResponse The initial response which must be of type {@link 
FileInitPayload}. This
-   *     indicates that subsequent responses need to be fetched using {@link
-   *     SubscriptionEvent#prefetchRemainingResponses()}.
+   * Constructs a {@link SubscriptionEvent} with the response type of {@link
+   * SubscriptionEventSingleResponse}.
    */
   public SubscriptionEvent(
-      final SubscriptionPipeEvents pipeEvents, final SubscriptionPollResponse 
initialResponse) {
-    this.pipeEvents = pipeEvents;
+      final short responseType,
+      final SubscriptionPollPayload payload,
+      final SubscriptionCommitContext commitContext) {
+    this.pipeEvents = new SubscriptionPipeEmptyEvent();
+    this.response = new SubscriptionEventSingleResponse(responseType, payload, 
commitContext);
+    this.commitContext = commitContext;
+  }
 
-    final int responseLength = 
getResponseLength(initialResponse.getResponseType());
-    this.responses = new SubscriptionPollResponse[responseLength];
-    this.responses[0] = initialResponse;
+  @TestOnly
+  public SubscriptionEvent(final SubscriptionPollResponse response) {
+    this(response.getResponseType(), response.getPayload(), 
response.getCommitContext());
+  }
 
-    this.byteBuffers = new ByteBuffer[responseLength];
-    this.commitContext = initialResponse.getCommitContext();
+  /**
+   * Constructs a {@link SubscriptionEvent} with the response type of {@link
+   * SubscriptionEventTabletResponse}.
+   */
+  public SubscriptionEvent(
+      final SubscriptionPipeTabletEventBatch batch, final 
SubscriptionCommitContext commitContext) {
+    this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch);
+    this.response = new SubscriptionEventTabletResponse(batch, commitContext);
+    this.commitContext = commitContext;
   }
 
   /**
-   * Constructs a {@link SubscriptionEvent} with a list of responses.
-   *
-   * @param pipeEvents The underlying pipe events corresponding to this {@link 
SubscriptionEvent}.
-   * @param responses A list of responses that can be of types {@link 
TabletsPayload}, {@link
-   *     TerminationPayload}, or {@link ErrorPayload}. All responses are 
already generated at the
-   *     time of construction, so {@link 
SubscriptionEvent#prefetchRemainingResponses()} is not
-   *     required.
+   * Constructs a {@link SubscriptionEvent} with the response type of {@link
+   * SubscriptionEventTsFileResponse}.
    */
   public SubscriptionEvent(
-      final SubscriptionPipeEvents pipeEvents, final 
List<SubscriptionPollResponse> responses) {
+      final SubscriptionPipeEvents pipeEvents,
+      final File tsFile,
+      final SubscriptionCommitContext commitContext) {
     this.pipeEvents = pipeEvents;
+    this.response = new SubscriptionEventTsFileResponse(tsFile, commitContext);
+    this.commitContext = commitContext;
 
-    final int responseLength = responses.size();
-    this.responses = new SubscriptionPollResponse[responseLength];
-    for (int i = 0; i < responseLength; i++) {
-      this.responses[i] = responses.get(i);
-    }
-
-    this.byteBuffers = new ByteBuffer[responseLength];
-    this.commitContext = this.responses[0].getCommitContext();
-  }
-
-  private int getResponseLength(final short responseType) {
-    if (!Objects.equals(SubscriptionPollResponseType.FILE_INIT.getType(), 
responseType)) {
-      LOGGER.warn("unexpected response type: {}", responseType);
-      return 1;
-    }
-    final long fileLength = pipeEvents.getTsFile().length();
-    final long readFileBufferSize =
-        SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
-    final int length = (int) (fileLength / readFileBufferSize);
-    // add for init, last piece and seal
-    return (fileLength % readFileBufferSize != 0) ? length + 3 : length + 2;
+    this.fileName = tsFile.getName();
   }
 
   public SubscriptionPollResponse getCurrentResponse() {
-    return getResponse(currentResponseIndex);
-  }
-
-  private SubscriptionPollResponse getResponse(final int index) {
-    return responses[index];
+    return response.getCurrentResponse();
   }
 
   public SubscriptionCommitContext getCommitContext() {
@@ -158,7 +135,7 @@ public class SubscriptionEvent {
       // event with invalid commit id is uncommittable
       return false;
     }
-    return currentResponseIndex >= responses.length - 1;
+    return response.isCommittable();
   }
 
   public void ack() {
@@ -172,10 +149,12 @@ public class SubscriptionEvent {
    */
   public void cleanUp() {
     // reset serialized responses
-    resetResponseByteBuffer(true);
+    response.cleanUp();
 
     // clean up pipe events
     pipeEvents.cleanUp();
+
+    // TODO: clean more fields
   }
 
   //////////////////////////// pollable ////////////////////////////
@@ -224,8 +203,8 @@ public class SubscriptionEvent {
   }
 
   public void nack() {
-    // reset current response index
-    currentResponseIndex = 0;
+    // nack response
+    response.nack();
 
     // reset lastPolledTimestamp makes this event pollable
     lastPolledTimestamp.set(INVALID_TIMESTAMP);
@@ -241,131 +220,31 @@ public class SubscriptionEvent {
 
   //////////////////////////// prefetch & fetch ////////////////////////////
 
-  /**
-   * @param index the index of response to be prefetched
-   */
-  private void prefetchResponse(final int index) throws IOException {
-    if (index >= responses.length || index <= 0) {
-      return;
-    }
-
-    if (Objects.nonNull(responses[index])) {
-      return;
-    }
-
-    final SubscriptionPollResponse previousResponse = this.getResponse(index - 
1);
-    final short responseType = previousResponse.getResponseType();
-    final SubscriptionPollPayload payload = previousResponse.getPayload();
-    if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
-      LOGGER.warn("unexpected response type: {}", responseType);
-      return;
-    }
-
-    switch (SubscriptionPollResponseType.valueOf(responseType)) {
-      case FILE_INIT:
-        responses[index] = 
generateSubscriptionPollResponseWithPieceOrSealPayload(0);
-        break;
-      case FILE_PIECE:
-        responses[index] =
-            generateSubscriptionPollResponseWithPieceOrSealPayload(
-                ((FilePiecePayload) payload).getNextWritingOffset());
-        break;
-      case FILE_SEAL:
-        // not need to prefetch
-        return;
-      default:
-        LOGGER.warn("unexpected message type: {}", responseType);
-    }
-  }
-
   public void prefetchRemainingResponses() throws IOException {
-    for (int currentIndex = currentResponseIndex;
-        currentIndex < responses.length - 1;
-        currentIndex++) {
-      if (Objects.isNull(responses[currentIndex + 1])) {
-        prefetchResponse(currentIndex + 1);
-        return;
-      }
-    }
+    response.prefetchRemainingResponses();
   }
 
   public void fetchNextResponse() throws IOException {
-    if (currentResponseIndex >= responses.length - 1) {
-      LOGGER.warn("No more responses when fetching next response for {}, do 
nothing.", this);
-      return;
-    }
-    if (Objects.isNull(responses[currentResponseIndex + 1])) {
-      prefetchRemainingResponses();
-    }
-    currentResponseIndex++;
+    response.fetchNextResponse();
   }
 
   //////////////////////////// byte buffer ////////////////////////////
 
   public void trySerializeRemainingResponses() {
-    for (int currentIndex = currentResponseIndex;
-        currentIndex < responses.length - 1;
-        currentIndex++) {
-      if (Objects.nonNull(responses[currentIndex + 1]) && 
trySerializeResponse(currentIndex + 1)) {
-        break;
-      }
-    }
-  }
-
-  public boolean trySerializeCurrentResponse() {
-    return trySerializeResponse(currentResponseIndex);
+    // TODO: consider memory usage
+    response.trySerializeRemainingResponses();
   }
 
-  /**
-   * @param index the index of response to be serialized
-   * @return {@code true} if a serialization operation was actually performed
-   */
-  private boolean trySerializeResponse(final int index) {
-    if (index >= responses.length) {
-      return false;
-    }
-
-    if (Objects.isNull(responses[index])) {
-      return false;
-    }
-
-    if (Objects.nonNull(byteBuffers[index])) {
-      return false;
-    }
-
-    final Optional<ByteBuffer> optionalByteBuffer =
-        
SubscriptionEventBinaryCache.getInstance().trySerialize(responses[index]);
-    if (optionalByteBuffer.isPresent()) {
-      byteBuffers[index] = optionalByteBuffer.get();
-      return true;
-    }
-
-    return false;
+  public void trySerializeCurrentResponse() {
+    response.trySerializeCurrentResponse();
   }
 
   public ByteBuffer getCurrentResponseByteBuffer() throws IOException {
-    if (Objects.nonNull(byteBuffers[currentResponseIndex])) {
-      return byteBuffers[currentResponseIndex];
-    }
-
-    return byteBuffers[currentResponseIndex] =
-        
SubscriptionEventBinaryCache.getInstance().serialize(getCurrentResponse());
+    return response.getCurrentResponseByteBuffer();
   }
 
-  public void resetResponseByteBuffer(final boolean resetAll) {
-    if (resetAll) {
-      SubscriptionEventBinaryCache.getInstance()
-          .invalidateAll(
-              
Arrays.stream(responses).filter(Objects::nonNull).collect(Collectors.toList()));
-      // maybe friendly for gc
-      Arrays.fill(byteBuffers, null);
-    } else {
-      if (Objects.nonNull(responses[currentResponseIndex])) {
-        
SubscriptionEventBinaryCache.getInstance().invalidate(responses[currentResponseIndex]);
-      }
-      // maybe friendly for gc
-      byteBuffers[currentResponseIndex] = null;
-    }
+  public void invalidateCurrentResponseByteBuffer() {
+    response.invalidateCurrentResponseByteBuffer();
   }
 
   public int getCurrentResponseSize() throws IOException {
@@ -374,69 +253,24 @@ public class SubscriptionEvent {
     return byteBuffer.limit() - byteBuffer.position();
   }
 
-  /////////////////////////////// tsfile ///////////////////////////////
-
-  private @NonNull SubscriptionPollResponse 
generateSubscriptionPollResponseWithPieceOrSealPayload(
-      final long writingOffset) throws IOException {
-    final File tsFile = pipeEvents.getTsFile();
-
-    final long readFileBufferSize =
-        SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
-    final byte[] readBuffer = new byte[(int) readFileBufferSize];
-    try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
-      while (true) {
-        reader.seek(writingOffset);
-        final int readLength = reader.read(readBuffer);
-        if (readLength == -1) {
-          break;
-        }
-
-        final byte[] filePiece =
-            readLength == readFileBufferSize
-                ? readBuffer
-                : Arrays.copyOfRange(readBuffer, 0, readLength);
-
-        // generate subscription poll response with piece payload
-        return new SubscriptionPollResponse(
-            SubscriptionPollResponseType.FILE_PIECE.getType(),
-            new FilePiecePayload(tsFile.getName(), writingOffset + readLength, 
filePiece),
-            commitContext);
-      }
-
-      // generate subscription poll response with seal payload
-      return new SubscriptionPollResponse(
-          SubscriptionPollResponseType.FILE_SEAL.getType(),
-          new FileSealPayload(tsFile.getName(), tsFile.length()),
-          commitContext);
-    }
-  }
+  //////////////////////////// tsfile ////////////////////////////
 
   public String getFileName() {
-    return pipeEvents.getTsFile().getName();
+    return fileName;
   }
 
-  /////////////////////////////// APIs provided for metric framework 
///////////////////////////////
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public int getPipeEventCount() {
     return pipeEvents.getPipeEventCount();
   }
 
-  /////////////////////////////// object ///////////////////////////////
+  //////////////////////////// object ////////////////////////////
 
   @Override
   public String toString() {
-    return "SubscriptionEvent{responses="
-        + Arrays.toString(responses)
-        + ", responses' byte buffer size="
-        + Arrays.stream(byteBuffers)
-            .map(
-                byteBuffer ->
-                    Objects.isNull(byteBuffer)
-                        ? "<unknown>"
-                        : byteBuffer.limit() - byteBuffer.position())
-            .collect(Collectors.toList())
-        + ", currentResponseIndex="
-        + currentResponseIndex
+    return "SubscriptionEvent{response="
+        + response
         + ", lastPolledConsumerId="
         + lastPolledConsumerId
         + ", lastPolledTimestamp="
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index 758d9b31567..a396ff8ee2a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -22,17 +22,25 @@ package org.apache.iotdb.db.subscription.event.batch;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 public abstract class SubscriptionPipeEventBatch {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionPipeEventBatch.class);
+
   private final int regionId;
 
   protected final SubscriptionPrefetchingQueue prefetchingQueue;
@@ -40,6 +48,7 @@ public abstract class SubscriptionPipeEventBatch {
   protected final long maxBatchSizeInBytes;
 
   protected volatile List<SubscriptionEvent> events = null;
+  protected final List<EnrichedEvent> enrichedEvents = new ArrayList<>();
 
   protected SubscriptionPipeEventBatch(
       final int regionId,
@@ -52,27 +61,60 @@ public abstract class SubscriptionPipeEventBatch {
     this.maxBatchSizeInBytes = maxBatchSizeInBytes;
   }
 
+  /////////////////////////////// ack & clean ///////////////////////////////
+
+  public abstract void ack();
+
+  public abstract void cleanUp();
+
+  /////////////////////////////// APIs ///////////////////////////////
+
   /**
    * @return {@code true} if there are subscription events consumed.
    */
-  public abstract boolean onEvent(final Consumer<SubscriptionEvent> consumer) 
throws Exception;
+  protected synchronized boolean onEvent(final Consumer<SubscriptionEvent> 
consumer)
+      throws Exception {
+    if (shouldEmit() && !enrichedEvents.isEmpty()) {
+      if (Objects.isNull(events)) {
+        events = generateSubscriptionEvents();
+      }
+      if (Objects.nonNull(events)) {
+        events.forEach(consumer);
+        return true;
+      }
+      return false;
+    }
+    return false;
+  }
 
   /**
    * @return {@code true} if there are subscription events consumed.
    */
-  public abstract boolean onEvent(
+  protected synchronized boolean onEvent(
       final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent> 
consumer)
-      throws Exception;
+      throws Exception {
+    if (event instanceof TabletInsertionEvent) {
+      onTabletInsertionEvent((TabletInsertionEvent) event); // no exceptions 
will be thrown
+      enrichedEvents.add(event);
+    } else if (event instanceof TsFileInsertionEvent) {
+      onTsFileInsertionEvent((TsFileInsertionEvent) event);
+      enrichedEvents.add(event);
+    } else {
+      LOGGER.warn(
+          "SubscriptionPipeEventBatch {} ignore EnrichedEvent {} when 
batching.", this, event);
+    }
+    return onEvent(consumer);
+  }
 
-  public abstract void cleanUp();
+  /////////////////////////////// utility ///////////////////////////////
 
-  public int getRegionId() {
-    return regionId;
-  }
+  protected abstract void onTabletInsertionEvent(final TabletInsertionEvent 
event) throws Exception;
 
-  public boolean isSealed() {
-    return Objects.nonNull(events);
-  }
+  protected abstract void onTsFileInsertionEvent(final TsFileInsertionEvent 
event) throws Exception;
+
+  protected abstract boolean shouldEmit();
+
+  protected abstract List<SubscriptionEvent> generateSubscriptionEvents() 
throws Exception;
 
   /////////////////////////////// stringify ///////////////////////////////
 
@@ -82,6 +124,28 @@ public abstract class SubscriptionPipeEventBatch {
     result.put("prefetchingQueue", 
prefetchingQueue.coreReportMessage().toString());
     result.put("maxDelayInMs", String.valueOf(maxDelayInMs));
     result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes));
+    // TODO: stringify subscription events?
+    result.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4));
     return result;
   }
+
+  private static String formatEnrichedEvents(
+      final List<EnrichedEvent> enrichedEvents, final int threshold) {
+    final List<String> eventMessageList =
+        enrichedEvents.stream()
+            .limit(threshold)
+            .map(EnrichedEvent::coreReportMessage)
+            .collect(Collectors.toList());
+    if (eventMessageList.size() > threshold) {
+      eventMessageList.add(
+          String.format("omit the remaining %s event(s)...", 
eventMessageList.size() - threshold));
+    }
+    return eventMessageList.toString();
+  }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public int getPipeEventCount() {
+    return enrichedEvents.size();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 871b1c12587..b54607c868b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -20,43 +20,31 @@
 package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.apache.tsfile.write.record.Tablet;
-import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class);
 
-  private static final long READ_TABLET_BUFFER_SIZE =
-      SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
-
-  private final List<EnrichedEvent> enrichedEvents = new ArrayList<>();
-  private final List<Tablet> tablets = new ArrayList<>();
-
+  private volatile List<Tablet> tablets = new LinkedList<>();
   private long firstEventProcessingTime = Long.MIN_VALUE;
   private long totalBufferSize = 0;
 
@@ -68,28 +56,36 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
     super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
   }
 
-  @Override
-  public synchronized boolean onEvent(final Consumer<SubscriptionEvent> 
consumer) {
-    if (shouldEmit() && !enrichedEvents.isEmpty()) {
-      if (Objects.isNull(events)) {
-        events = generateSubscriptionEvents();
+  public LinkedList<Tablet> moveTablets() {
+    if (Objects.isNull(tablets)) {
+      tablets = new ArrayList<>();
+      for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+        if (enrichedEvent instanceof TsFileInsertionEvent) {
+          onTsFileInsertionEvent((TsFileInsertionEvent) enrichedEvent);
+        } else if (enrichedEvent instanceof TabletInsertionEvent) {
+          onTabletInsertionEvent((TabletInsertionEvent) enrichedEvent);
+        } else {
+          LOGGER.warn(
+              "SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {} 
when moving.",
+              this,
+              enrichedEvent);
+        }
       }
-      if (Objects.nonNull(events)) {
-        events.forEach(consumer);
-        return true;
-      }
-      return false;
     }
-    return false;
+    final LinkedList<Tablet> result = new LinkedList<>(tablets);
+    firstEventProcessingTime = Long.MIN_VALUE;
+    totalBufferSize = 0;
+    tablets = null; // reset to null for gc & subsequent move
+    return result;
   }
 
+  /////////////////////////////// ack & clean ///////////////////////////////
+
   @Override
-  public synchronized boolean onEvent(
-      final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent> 
consumer) {
-    if (event instanceof TabletInsertionEvent) {
-      onEventInternal((TabletInsertionEvent) event); // no exceptions will be 
thrown
+  public synchronized void ack() {
+    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+      enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
     }
-    return onEvent(consumer);
   }
 
   @Override
@@ -99,65 +95,42 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
       enrichedEvent.clearReferenceCount(this.getClass().getName());
     }
     enrichedEvents.clear();
-    tablets.clear();
+    if (Objects.nonNull(tablets)) {
+      tablets.clear();
+    }
   }
 
-  public synchronized void ack() {
-    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
-      enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+  /////////////////////////////// utility ///////////////////////////////
+
+  @Override
+  protected void onTabletInsertionEvent(final TabletInsertionEvent event) {
+    constructBatch(event);
+    if (firstEventProcessingTime == Long.MIN_VALUE) {
+      firstEventProcessingTime = System.currentTimeMillis();
     }
   }
 
-  /////////////////////////////// utility ///////////////////////////////
+  @Override
+  protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
+    for (final TabletInsertionEvent tabletInsertionEvent : 
event.toTabletInsertionEvents()) {
+      onTabletInsertionEvent(tabletInsertionEvent);
+    }
+  }
 
-  private List<SubscriptionEvent> generateSubscriptionEvents() {
+  @Override
+  protected List<SubscriptionEvent> generateSubscriptionEvents() {
     if (tablets.isEmpty()) {
       return null;
     }
 
-    final SubscriptionCommitContext commitContext =
-        prefetchingQueue.generateSubscriptionCommitContext();
-    final List<SubscriptionPollResponse> responses = new ArrayList<>();
-    final List<Tablet> currentTablets = new ArrayList<>();
-    long currentTotalBufferSize = 0;
-    for (final Tablet tablet : tablets) {
-      final long bufferSize = 
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
-      if (bufferSize > READ_TABLET_BUFFER_SIZE) {
-        LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize);
-        responses.add(
-            new SubscriptionPollResponse(
-                SubscriptionPollResponseType.TABLETS.getType(),
-                new TabletsPayload(Collections.singletonList(tablet), 
responses.size() + 1),
-                commitContext));
-        continue;
-      }
-      if (currentTotalBufferSize + bufferSize > READ_TABLET_BUFFER_SIZE) {
-        responses.add(
-            new SubscriptionPollResponse(
-                SubscriptionPollResponseType.TABLETS.getType(),
-                new TabletsPayload(new ArrayList<>(currentTablets), 
responses.size() + 1),
-                commitContext));
-        currentTablets.clear();
-        currentTotalBufferSize = 0;
-      }
-      currentTablets.add(tablet);
-      currentTotalBufferSize += bufferSize;
-    }
-    responses.add(
-        new SubscriptionPollResponse(
-            SubscriptionPollResponseType.TABLETS.getType(),
-            new TabletsPayload(new ArrayList<>(currentTablets), 
-tablets.size()),
-            commitContext));
     return Collections.singletonList(
-        new SubscriptionEvent(new SubscriptionPipeTabletBatchEvents(this), 
responses));
+        new SubscriptionEvent(this, 
prefetchingQueue.generateSubscriptionCommitContext()));
   }
 
-  private void onEventInternal(final TabletInsertionEvent event) {
-    constructBatch(event);
-    enrichedEvents.add((EnrichedEvent) event);
-    if (firstEventProcessingTime == Long.MIN_VALUE) {
-      firstEventProcessingTime = System.currentTimeMillis();
-    }
+  @Override
+  protected boolean shouldEmit() {
+    return totalBufferSize >= maxBatchSizeInBytes
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
   }
 
   private void constructBatch(final TabletInsertionEvent event) {
@@ -173,11 +146,6 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
             .orElse(0L);
   }
 
-  private boolean shouldEmit() {
-    return totalBufferSize >= maxBatchSizeInBytes
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
-  }
-
   private List<Tablet> convertToTablets(final TabletInsertionEvent 
tabletInsertionEvent) {
     if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
       return ((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent).convertToTablets();
@@ -203,30 +171,11 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   @Override
   protected Map<String, String> coreReportMessage() {
     final Map<String, String> coreReportMessage = super.coreReportMessage();
-    coreReportMessage.put("enrichedEvents", 
formatEnrichedEvents(enrichedEvents, 4));
-    coreReportMessage.put("size of tablets", String.valueOf(tablets.size()));
+    coreReportMessage.put(
+        "size of tablets",
+        (Objects.nonNull(tablets) ? String.valueOf(tablets.size()) : 
"<unknown>"));
     coreReportMessage.put("firstEventProcessingTime", 
String.valueOf(firstEventProcessingTime));
     coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize));
     return coreReportMessage;
   }
-
-  private static String formatEnrichedEvents(
-      final List<EnrichedEvent> enrichedEvents, final int threshold) {
-    final List<String> eventMessageList =
-        enrichedEvents.stream()
-            .limit(threshold)
-            .map(EnrichedEvent::coreReportMessage)
-            .collect(Collectors.toList());
-    if (eventMessageList.size() > threshold) {
-      eventMessageList.add(
-          String.format("omit the remaining %s event(s)...", 
eventMessageList.size() - threshold));
-    }
-    return eventMessageList.toString();
-  }
-
-  //////////////////////////// APIs provided for metric framework 
////////////////////////////
-
-  public int getPipeEventCount() {
-    return enrichedEvents.size();
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index e38887e19f2..dc96fc476da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -25,25 +25,24 @@ import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueu
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 
-import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 
 public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SubscriptionPipeTsFileEventBatch.class);
+
   private final PipeTabletEventTsFileBatch batch;
-  private final List<EnrichedEvent> enrichedEvents;
 
   public SubscriptionPipeTsFileEventBatch(
       final int regionId,
@@ -52,36 +51,11 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
       final long maxBatchSizeInBytes) {
     super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
     this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, 
maxBatchSizeInBytes);
-    this.enrichedEvents = new ArrayList<>();
-  }
-
-  @Override
-  public synchronized boolean onEvent(final Consumer<SubscriptionEvent> 
consumer) throws Exception {
-    if (batch.shouldEmit() && !enrichedEvents.isEmpty()) {
-      if (Objects.isNull(events)) {
-        events = generateSubscriptionEvents();
-      }
-      if (Objects.nonNull(events)) {
-        events.forEach(consumer);
-        return true;
-      }
-      return false;
-    }
-    return false;
   }
 
   @Override
-  public synchronized boolean onEvent(
-      final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent> 
consumer)
-      throws Exception {
-    if (event instanceof TabletInsertionEvent) {
-      batch.onEvent((TabletInsertionEvent) event); // no exceptions will be 
thrown
-      enrichedEvents.add(event);
-      event.decreaseReferenceCount(
-          SubscriptionPipeTsFileEventBatch.class.getName(),
-          false); // missing releaseLastEvent decreases reference count
-    }
-    return onEvent(consumer);
+  public synchronized void ack() {
+    batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
   }
 
   @Override
@@ -91,13 +65,27 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
     enrichedEvents.clear();
   }
 
-  public synchronized void ack() {
-    batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
+  /////////////////////////////// utility ///////////////////////////////
+
+  @Override
+  protected void onTabletInsertionEvent(final TabletInsertionEvent event) 
throws Exception {
+    batch.onEvent(event); // no exceptions will be thrown
+    ((EnrichedEvent) event)
+        .decreaseReferenceCount(
+            SubscriptionPipeTsFileEventBatch.class.getName(),
+            false); // missing releaseLastEvent decreases reference count
   }
 
-  /////////////////////////////// utility ///////////////////////////////
+  @Override
+  protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
+    LOGGER.warn(
+        "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} 
when batching.",
+        this,
+        event);
+  }
 
-  private List<SubscriptionEvent> generateSubscriptionEvents() throws 
Exception {
+  @Override
+  protected List<SubscriptionEvent> generateSubscriptionEvents() throws 
Exception {
     if (batch.isEmpty()) {
       return null;
     }
@@ -110,15 +98,16 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
           prefetchingQueue.generateSubscriptionCommitContext();
       events.add(
           new SubscriptionEvent(
-              new SubscriptionPipeTsFileBatchEvents(this, tsFile, 
referenceCount),
-              new SubscriptionPollResponse(
-                  SubscriptionPollResponseType.FILE_INIT.getType(),
-                  new FileInitPayload(tsFile.getName()),
-                  commitContext)));
+              new SubscriptionPipeTsFileBatchEvents(this, referenceCount), 
tsFile, commitContext));
     }
     return events;
   }
 
+  @Override
+  protected boolean shouldEmit() {
+    return batch.shouldEmit();
+  }
+
   /////////////////////////////// stringify ///////////////////////////////
 
   @Override
@@ -132,10 +121,4 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
     coreReportMessage.put("batch", batch.toString());
     return coreReportMessage;
   }
-
-  //////////////////////////// APIs provided for metric framework 
////////////////////////////
-
-  public int getPipeEventCount() {
-    return enrichedEvents.size();
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
new file mode 100644
index 00000000000..54dbcceaccb
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/CachedSubscriptionPollResponse.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.cache;
+
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class CachedSubscriptionPollResponse extends SubscriptionPollResponse {
+
+  private volatile ByteBuffer byteBuffer; // cached serialized response
+
+  public CachedSubscriptionPollResponse(
+      final short responseType,
+      final SubscriptionPollPayload payload,
+      final SubscriptionCommitContext commitContext) {
+    super(responseType, payload, commitContext);
+  }
+
+  public CachedSubscriptionPollResponse(final SubscriptionPollResponse 
response) {
+    super(response.getResponseType(), response.getPayload(), 
response.getCommitContext());
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return byteBuffer;
+  }
+
+  public void invalidateByteBuffer() {
+    // maybe friendly for gc
+    byteBuffer = null;
+  }
+
+  public static ByteBuffer serialize(final CachedSubscriptionPollResponse 
response)
+      throws IOException {
+    return response.serialize();
+  }
+
+  private ByteBuffer serialize() throws IOException {
+    return Objects.nonNull(byteBuffer)
+        ? byteBuffer
+        : (byteBuffer = SubscriptionPollResponse.serialize(this));
+  }
+
+  /////////////////////////////// stringify ///////////////////////////////
+
+  @Override
+  public String toString() {
+    return "CachedSubscriptionPollResponse" + coreReportMessage();
+  }
+
+  @Override
+  protected Map<String, String> coreReportMessage() {
+    final Map<String, String> coreReportMessage = super.coreReportMessage();
+    coreReportMessage.put(
+        "sizeof(byteBuffer)",
+        Objects.isNull(byteBuffer)
+            ? "<unknown>"
+            : String.valueOf(byteBuffer.limit() - byteBuffer.position()));
+    return coreReportMessage;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
similarity index 78%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 6e62a819c69..02a634c9c19 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.subscription.event;
+package org.apache.iotdb.db.subscription.event.cache;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -35,65 +35,66 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Optional;
 
-/** This class is used to cache {@link SubscriptionPollResponse} in {@link 
SubscriptionEvent}. */
-class SubscriptionEventBinaryCache {
+/**
+ * This class is used to control memory usage of cache {@link 
SubscriptionPollResponse} in {@link
+ * CachedSubscriptionPollResponse}.
+ */
+public class SubscriptionPollResponseCache {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionEventBinaryCache.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionPollResponseCache.class);
 
   private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
 
-  private final LoadingCache<SubscriptionPollResponse, ByteBuffer> cache;
+  private final LoadingCache<CachedSubscriptionPollResponse, ByteBuffer> cache;
 
-  ByteBuffer serialize(final SubscriptionPollResponse response) throws 
IOException {
+  public ByteBuffer serialize(final CachedSubscriptionPollResponse response) 
throws IOException {
     try {
       return this.cache.get(response);
     } catch (final Exception e) {
       LOGGER.warn(
-          "SubscriptionEventBinaryCache raised an exception while serializing 
SubscriptionPollResponse: {}",
+          "SubscriptionEventBinaryCache raised an exception while serializing 
CachedSubscriptionPollResponse: {}",
           response,
           e);
       throw new IOException(e);
     }
   }
 
-  Optional<ByteBuffer> trySerialize(final SubscriptionPollResponse response) {
+  public Optional<ByteBuffer> trySerialize(final 
CachedSubscriptionPollResponse response) {
     try {
       return Optional.of(serialize(response));
     } catch (final IOException e) {
       LOGGER.warn(
-          "Subscription: something unexpected happened when serializing 
SubscriptionPollResponse: {}",
+          "Subscription: something unexpected happened when serializing 
CachedSubscriptionPollResponse: {}",
           response,
           e);
       return Optional.empty();
     }
   }
 
-  void invalidate(final SubscriptionPollResponse response) {
+  public void invalidate(final CachedSubscriptionPollResponse response) {
     this.cache.invalidate(response);
-  }
-
-  void invalidateAll(final Iterable<SubscriptionPollResponse> responses) {
-    this.cache.invalidateAll(responses);
+    response.invalidateByteBuffer();
   }
 
   //////////////////////////// singleton ////////////////////////////
 
   private static class SubscriptionEventBinaryCacheHolder {
 
-    private static final SubscriptionEventBinaryCache INSTANCE = new 
SubscriptionEventBinaryCache();
+    private static final SubscriptionPollResponseCache INSTANCE =
+        new SubscriptionPollResponseCache();
 
     private SubscriptionEventBinaryCacheHolder() {
       // empty constructor
     }
   }
 
-  static SubscriptionEventBinaryCache getInstance() {
-    return 
SubscriptionEventBinaryCache.SubscriptionEventBinaryCacheHolder.INSTANCE;
+  public static SubscriptionPollResponseCache getInstance() {
+    return 
SubscriptionPollResponseCache.SubscriptionEventBinaryCacheHolder.INSTANCE;
   }
 
-  private SubscriptionEventBinaryCache() {
+  private SubscriptionPollResponseCache() {
     final long initMemorySizeInBytes =
-        PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
+        PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 5;
     final long maxMemorySizeInBytes =
         (long)
             (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
@@ -129,13 +130,13 @@ class SubscriptionEventBinaryCache {
         Caffeine.newBuilder()
             .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
             .weigher(
-                (Weigher<SubscriptionPollResponse, ByteBuffer>)
+                (Weigher<CachedSubscriptionPollResponse, ByteBuffer>)
                     (message, buffer) -> {
                       // TODO: overflow
                       return (int) (buffer.capacity() * 
memoryUsageCheatFactor.get());
                     })
             .recordStats() // TODO: metrics
             // NOTE: lambda CAN NOT be replaced with method reference
-            .build(response -> SubscriptionPollResponse.serialize(response));
+            .build(response -> 
CachedSubscriptionPollResponse.serialize(response));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index 3e74e03d8f4..ef6cc39a60c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -19,15 +19,8 @@
 
 package org.apache.iotdb.db.subscription.event.pipe;
 
-import java.io.File;
-
 public class SubscriptionPipeEmptyEvent implements SubscriptionPipeEvents {
 
-  @Override
-  public File getTsFile() {
-    return null;
-  }
-
   @Override
   public void ack() {}
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
index 489c4cf8120..05f011699ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
@@ -19,15 +19,8 @@
 
 package org.apache.iotdb.db.subscription.event.pipe;
 
-import java.io.File;
-
 public interface SubscriptionPipeEvents {
 
-  /**
-   * @return {@code null} if the pipe events do not contain the corresponding 
tsfile.
-   */
-  File getTsFile();
-
   void ack();
 
   void cleanUp();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index 226367405eb..f518c7b0194 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.subscription.event.pipe;
 
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
 
-import java.io.File;
-
 public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents {
 
   private final SubscriptionPipeTabletEventBatch batch;
@@ -31,11 +29,6 @@ public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents
     this.batch = batch;
   }
 
-  @Override
-  public File getTsFile() {
-    return null;
-  }
-
   @Override
   public void ack() {
     batch.ack();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 5cae21f5ac8..16151a16a50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -21,31 +21,21 @@ package org.apache.iotdb.db.subscription.event.pipe;
 
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
 
-import java.io.File;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents {
 
   private final SubscriptionPipeTsFileEventBatch batch;
-  private final File tsFile;
   private final AtomicInteger referenceCount; // shared between the same batch
   private final int count; // snapshot the initial reference count, used for 
event count calculation
 
   public SubscriptionPipeTsFileBatchEvents(
-      final SubscriptionPipeTsFileEventBatch batch,
-      final File tsFile,
-      final AtomicInteger referenceCount) {
+      final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger 
referenceCount) {
     this.batch = batch;
-    this.tsFile = tsFile;
     this.referenceCount = referenceCount;
     this.count = Math.max(1, referenceCount.get());
   }
 
-  @Override
-  public File getTsFile() {
-    return tsFile;
-  }
-
   @Override
   public void ack() {
     if (referenceCount.decrementAndGet() == 0) {
@@ -66,8 +56,6 @@ public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents
   public String toString() {
     return "SubscriptionPipeTsFileBatchEvents{batch="
         + batch
-        + ", tsFile="
-        + tsFile
         + ", referenceCount="
         + referenceCount
         + "}";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index 111006fa6d3..8effb654d73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.subscription.event.pipe;
 
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
-import java.io.File;
-
 public class SubscriptionPipeTsFilePlainEvent implements 
SubscriptionPipeEvents {
 
   private final PipeTsFileInsertionEvent tsFileInsertionEvent;
@@ -31,11 +29,6 @@ public class SubscriptionPipeTsFilePlainEvent implements 
SubscriptionPipeEvents
     this.tsFileInsertionEvent = tsFileInsertionEvent;
   }
 
-  @Override
-  public File getTsFile() {
-    return tsFileInsertionEvent.getTsFile();
-  }
-
   @Override
   public void ack() {
     tsFileInsertionEvent.decreaseReferenceCount(this.getClass().getName(), 
true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
new file mode 100644
index 00000000000..07dfaabdf4f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import 
org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+/**
+ * The {@code SubscriptionEventExtendableResponse} class represents a 
subscription event response
+ * that can dynamically change as new responses are fetched. It maintains a 
list of {@link
+ * CachedSubscriptionPollResponse} objects and provides methods for managing 
and serializing these
+ * responses.
+ */
+public abstract class SubscriptionEventExtendableResponse
+    implements SubscriptionEventResponse<CachedSubscriptionPollResponse> {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SubscriptionEventTabletResponse.class);
+
+  private final Deque<CachedSubscriptionPollResponse> responses;
+  protected volatile boolean hasNoMore = false;
+
+  protected SubscriptionEventExtendableResponse() {
+    this.responses = new ConcurrentLinkedDeque<>();
+  }
+
+  @Override
+  public CachedSubscriptionPollResponse getCurrentResponse() {
+    return peekFirst();
+  }
+
+  @Override
+  public void fetchNextResponse() throws IOException {
+    prefetchRemainingResponses();
+    if (Objects.isNull(poll())) {
+      LOGGER.warn(
+          "SubscriptionEventExtendableResponse {} is empty when fetching next 
response (broken invariant)",
+          this);
+    }
+  }
+
+  @Override
+  public void trySerializeCurrentResponse() {
+    
SubscriptionPollResponseCache.getInstance().trySerialize(getCurrentResponse());
+  }
+
+  @Override
+  public void trySerializeRemainingResponses() {
+    responses.stream()
+        .skip(1)
+        .filter(response -> Objects.isNull(response.getByteBuffer()))
+        .findFirst()
+        .ifPresent(response -> 
SubscriptionPollResponseCache.getInstance().trySerialize(response));
+  }
+
+  @Override
+  public ByteBuffer getCurrentResponseByteBuffer() throws IOException {
+    return 
SubscriptionPollResponseCache.getInstance().serialize(getCurrentResponse());
+  }
+
+  @Override
+  public void invalidateCurrentResponseByteBuffer() {
+    
SubscriptionPollResponseCache.getInstance().invalidate(getCurrentResponse());
+  }
+
+  @Override
+  public void cleanUp() {
+    CachedSubscriptionPollResponse response;
+    while (Objects.nonNull(response = poll())) {
+      SubscriptionPollResponseCache.getInstance().invalidate(response);
+    }
+
+    hasNoMore = false;
+  }
+
+  @Override
+  public boolean isCommittable() {
+    return hasNoMore && size() == 1;
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  protected void offer(final CachedSubscriptionPollResponse response) {
+    responses.addLast(response);
+  }
+
+  protected CachedSubscriptionPollResponse poll() {
+    return responses.isEmpty() ? null : responses.removeFirst();
+  }
+
+  protected CachedSubscriptionPollResponse peekFirst() {
+    return responses.isEmpty() ? null : responses.getFirst();
+  }
+
+  protected CachedSubscriptionPollResponse peekLast() {
+    return responses.isEmpty() ? null : responses.getLast();
+  }
+
+  protected int size() {
+    return responses.size();
+  }
+
+  protected boolean isEmpty() {
+    return responses.isEmpty();
+  }
+
+  /////////////////////////////// stringify ///////////////////////////////
+
+  @Override
+  public String toString() {
+    return "SubscriptionEventExtendableResponse" + coreReportMessage();
+  }
+
+  protected Map<String, String> coreReportMessage() {
+    final Map<String, String> result = new HashMap<>();
+    final CachedSubscriptionPollResponse currentResponse = 
getCurrentResponse();
+    result.put(
+        "currentResponse",
+        Objects.nonNull(currentResponse) ? currentResponse.toString() : 
"<unknown>");
+    result.put("hasNoMore", String.valueOf(hasNoMore));
+    return result;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
similarity index 52%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
index 489c4cf8120..211f0905afa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
@@ -17,22 +17,36 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.subscription.event.pipe;
+package org.apache.iotdb.db.subscription.event.response;
 
-import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
-public interface SubscriptionPipeEvents {
+public interface SubscriptionEventResponse<E> {
 
-  /**
-   * @return {@code null} if the pipe events do not contain the corresponding 
tsfile.
-   */
-  File getTsFile();
+  /////////////////////////////// response ///////////////////////////////
 
-  void ack();
+  E getCurrentResponse();
 
-  void cleanUp();
+  void prefetchRemainingResponses() throws IOException;
+
+  void fetchNextResponse() throws IOException;
+
+  /////////////////////////////// byte buffer ///////////////////////////////
+
+  void trySerializeCurrentResponse();
+
+  void trySerializeRemainingResponses();
 
-  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+  ByteBuffer getCurrentResponseByteBuffer() throws IOException;
+
+  void invalidateCurrentResponseByteBuffer();
+
+  /////////////////////////////// lifecycle ///////////////////////////////
+
+  void nack();
+
+  void cleanUp();
 
-  int getPipeEventCount();
+  boolean isCommittable();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
new file mode 100644
index 00000000000..dbc48ebda00
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventSingleResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import 
org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The {@link SubscriptionEventSingleResponse} class represents a single 
subscription event response
+ * that wraps a cached {@link SubscriptionPollResponse}. The actual payload of 
the response can be
+ * either a {@link TerminationPayload} or an {@link ErrorPayload}.
+ */
+public class SubscriptionEventSingleResponse
+    implements SubscriptionEventResponse<CachedSubscriptionPollResponse> {
+
+  private final CachedSubscriptionPollResponse response;
+
+  public SubscriptionEventSingleResponse(
+      final short responseType,
+      final SubscriptionPollPayload payload,
+      final SubscriptionCommitContext commitContext) {
+    this.response = new CachedSubscriptionPollResponse(responseType, payload, 
commitContext);
+  }
+
+  public SubscriptionEventSingleResponse(final SubscriptionPollResponse 
response) {
+    this.response = new CachedSubscriptionPollResponse(response);
+  }
+
+  @Override
+  public CachedSubscriptionPollResponse getCurrentResponse() {
+    return response;
+  }
+
+  @Override
+  public void prefetchRemainingResponses() {
+    // do nothing
+  }
+
+  @Override
+  public void fetchNextResponse() {
+    // do nothing
+  }
+
+  @Override
+  public void trySerializeCurrentResponse() {
+    SubscriptionPollResponseCache.getInstance().trySerialize(response);
+  }
+
+  @Override
+  public void trySerializeRemainingResponses() {
+    // do nothing
+  }
+
+  @Override
+  public ByteBuffer getCurrentResponseByteBuffer() throws IOException {
+    return SubscriptionPollResponseCache.getInstance().serialize(response);
+  }
+
+  @Override
+  public void invalidateCurrentResponseByteBuffer() {
+    SubscriptionPollResponseCache.getInstance().invalidate(response);
+  }
+
+  @Override
+  public void nack() {
+    invalidateCurrentResponseByteBuffer();
+  }
+
+  @Override
+  public void cleanUp() {
+    invalidateCurrentResponseByteBuffer();
+  }
+
+  @Override
+  public boolean isCommittable() {
+    return true;
+  }
+
+  /////////////////////////////// stringify ///////////////////////////////
+
+  @Override
+  public String toString() {
+    return "SubscriptionEventSingleResponse" + coreReportMessage();
+  }
+
+  protected Map<String, String> coreReportMessage() {
+    final Map<String, String> result = new HashMap<>();
+    final CachedSubscriptionPollResponse currentResponse = 
getCurrentResponse();
+    result.put(
+        "currentResponse",
+        Objects.nonNull(currentResponse) ? currentResponse.toString() : 
"<unknown>");
+    return result;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
new file mode 100644
index 00000000000..4fd13517891
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
+import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
+
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The {@code SubscriptionEventTabletResponse} class extends {@link
+ * SubscriptionEventExtendableResponse} to handle subscription responses 
specifically for tablet
+ * data. The actual payload of the response includes a {@link TabletsPayload}, 
which contains the
+ * tablet information being processed.
+ */
+public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendableResponse {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SubscriptionEventTabletResponse.class);
+
+  private static final long READ_TABLET_BUFFER_SIZE =
+      SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
+
+  private final SubscriptionPipeTabletEventBatch batch;
+  private final SubscriptionCommitContext commitContext;
+
+  private volatile LinkedList<Tablet> tablets;
+  private volatile int tabletsSize;
+  private final AtomicInteger nextOffset = new AtomicInteger(0);
+
+  public SubscriptionEventTabletResponse(
+      final SubscriptionPipeTabletEventBatch batch, final 
SubscriptionCommitContext commitContext) {
+    this.batch = batch;
+    this.commitContext = commitContext;
+
+    init(batch);
+  }
+
+  @Override
+  public void prefetchRemainingResponses() {
+    if (hasNoMore) {
+      return;
+    }
+
+    offer(generateNextTabletResponse());
+  }
+
+  @Override
+  public void nack() {
+    cleanUp();
+    init(batch);
+  }
+
+  @Override
+  public void cleanUp() {
+    super.cleanUp();
+
+    tablets = null;
+    tabletsSize = 0;
+    nextOffset.set(0);
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  private void init(final SubscriptionPipeTabletEventBatch batch) {
+    if (!isEmpty()) {
+      LOGGER.warn(
+          "SubscriptionEventTabletResponse {} is not empty when initializing 
(broken invariant)",
+          this);
+      return;
+    }
+
+    tablets = batch.moveTablets();
+    tabletsSize = tablets.size();
+    offer(generateNextTabletResponse());
+  }
+
+  private synchronized CachedSubscriptionPollResponse 
generateNextTabletResponse() {
+    final List<Tablet> currentTablets = new ArrayList<>();
+    final AtomicLong currentTotalBufferSize = new AtomicLong();
+
+    Tablet currentTablet;
+    while (!tablets.isEmpty() && Objects.nonNull(currentTablet = 
tablets.removeFirst())) {
+      final long bufferSize = 
PipeMemoryWeightUtil.calculateTabletSizeInBytes(currentTablet);
+      if (bufferSize > READ_TABLET_BUFFER_SIZE) {
+        LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize);
+        tablets.addAll(currentTablets); // re-enqueue previous tablets
+        currentTablets.clear();
+        currentTotalBufferSize.set(0);
+        return new CachedSubscriptionPollResponse(
+            SubscriptionPollResponseType.TABLETS.getType(),
+            new TabletsPayload(
+                Collections.singletonList(currentTablet), 
nextOffset.incrementAndGet()),
+            commitContext);
+      }
+      if (currentTotalBufferSize.get() + bufferSize > READ_TABLET_BUFFER_SIZE) 
{
+        final CachedSubscriptionPollResponse response =
+            new CachedSubscriptionPollResponse(
+                SubscriptionPollResponseType.TABLETS.getType(),
+                new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
+                commitContext);
+        tablets.add(currentTablet); // re-enqueue current tablet
+        currentTablets.clear();
+        currentTotalBufferSize.set(0);
+        return response;
+      }
+      currentTablets.add(currentTablet);
+      currentTotalBufferSize.addAndGet(bufferSize);
+    }
+
+    final CachedSubscriptionPollResponse response;
+    if (currentTablets.isEmpty()) {
+      response =
+          new CachedSubscriptionPollResponse(
+              SubscriptionPollResponseType.TABLETS.getType(),
+              new TabletsPayload(Collections.emptyList(), -tabletsSize),
+              commitContext);
+      hasNoMore = true;
+    } else {
+      response =
+          new CachedSubscriptionPollResponse(
+              SubscriptionPollResponseType.TABLETS.getType(),
+              new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
+              commitContext);
+    }
+    currentTablets.clear();
+    currentTotalBufferSize.set(0);
+    return response;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
new file mode 100644
index 00000000000..01b6611dd06
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.response;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import 
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
+import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The {@code SubscriptionEventTsFileResponse} class extends {@link
+ * SubscriptionEventExtendableResponse} to manage subscription responses 
related to time series
+ * files. The actual payload can include {@link FileInitPayload}, {@link 
FilePiecePayload}, and
+ * {@link FileSealPayload}, allowing for detailed control over file data 
streaming.
+ */
+public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendableResponse {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SubscriptionEventTsFileResponse.class);
+
+  private final File tsFile;
+  private final SubscriptionCommitContext commitContext;
+
+  public SubscriptionEventTsFileResponse(
+      final File tsFile, final SubscriptionCommitContext commitContext) {
+    super();
+
+    this.tsFile = tsFile;
+    this.commitContext = commitContext;
+
+    init();
+  }
+
+  @Override
+  public void prefetchRemainingResponses() throws IOException {
+    if (hasNoMore) {
+      return;
+    }
+
+    generateNextTsFileResponse().ifPresent(super::offer);
+  }
+
+  @Override
+  public void nack() {
+    cleanUp();
+    init();
+  }
+
+  @Override
+  public void cleanUp() {
+    super.cleanUp();
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  private void init() {
+    if (!isEmpty()) {
+      LOGGER.warn(
+          "SubscriptionEventTsFileResponse {} is not empty when initializing 
(broken invariant)",
+          this);
+      return;
+    }
+
+    offer(
+        new CachedSubscriptionPollResponse(
+            SubscriptionPollResponseType.FILE_INIT.getType(),
+            new FileInitPayload(tsFile.getName()),
+            commitContext));
+  }
+
+  private synchronized Optional<CachedSubscriptionPollResponse> 
generateNextTsFileResponse()
+      throws IOException {
+    final SubscriptionPollResponse previousResponse = peekLast();
+    if (Objects.isNull(previousResponse)) {
+      LOGGER.warn(
+          "SubscriptionEventTsFileResponse {} is empty when generating next 
response (broken invariant)",
+          this);
+      return Optional.empty();
+    }
+    final short responseType = previousResponse.getResponseType();
+    final SubscriptionPollPayload payload = previousResponse.getPayload();
+    if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
+      LOGGER.warn("unexpected response type: {}", responseType);
+      return Optional.empty();
+    }
+
+    switch (SubscriptionPollResponseType.valueOf(responseType)) {
+      case FILE_INIT:
+        return Optional.of(generateResponseWithPieceOrSealPayload(0));
+      case FILE_PIECE:
+        return Optional.of(
+            generateResponseWithPieceOrSealPayload(
+                ((FilePiecePayload) payload).getNextWritingOffset()));
+      case FILE_SEAL:
+        // not need to prefetch
+        break;
+      default:
+        LOGGER.warn("unexpected message type: {}", responseType);
+    }
+
+    return Optional.empty();
+  }
+
+  private @NonNull CachedSubscriptionPollResponse 
generateResponseWithPieceOrSealPayload(
+      final long writingOffset) throws IOException {
+    final long readFileBufferSize =
+        SubscriptionConfig.getInstance().getSubscriptionReadFileBufferSize();
+    final byte[] readBuffer = new byte[(int) readFileBufferSize];
+    try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
+      while (true) {
+        reader.seek(writingOffset);
+        final int readLength = reader.read(readBuffer);
+        if (readLength == -1) {
+          break;
+        }
+
+        final byte[] filePiece =
+            readLength == readFileBufferSize
+                ? readBuffer
+                : Arrays.copyOfRange(readBuffer, 0, readLength);
+
+        // generate subscription poll response with piece payload
+        return new CachedSubscriptionPollResponse(
+            SubscriptionPollResponseType.FILE_PIECE.getType(),
+            new FilePiecePayload(tsFile.getName(), writingOffset + readLength, 
filePiece),
+            commitContext);
+      }
+
+      // generate subscription poll response with seal payload
+      hasNoMore = true;
+      return new CachedSubscriptionPollResponse(
+          SubscriptionPollResponseType.FILE_SEAL.getType(),
+          new FileSealPayload(tsFile.getName(), tsFile.length()),
+          commitContext);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 97401c73808..d6962d22543 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -421,7 +421,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
                               
SubscriptionPrefetchingQueue.generatePrefetchingQueueId(
                                   commitContext.getConsumerGroupId(), 
commitContext.getTopicName()),
                               size);
-                      event.resetResponseByteBuffer(false);
+                      event.invalidateCurrentResponseByteBuffer();
                       LOGGER.info(
                           "Subscription: consumer {} poll {} successfully with 
request: {}",
                           consumerConfig,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
index 7b4e76fac76..248ceda2b6d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/SubscriptionStatesTest.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription;
 
 import org.apache.iotdb.db.subscription.broker.SubscriptionStates;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
@@ -53,28 +52,24 @@ public class SubscriptionStatesTest {
 
     final SubscriptionEvent event1 =
         new SubscriptionEvent(
-            new SubscriptionPipeEmptyEvent(),
             new SubscriptionPollResponse(
                 SubscriptionPollResponseType.TABLETS.getType(),
                 payload,
                 new SubscriptionCommitContext(-1, -1, "topic1", "cg1", 0)));
     final SubscriptionEvent event2 =
         new SubscriptionEvent(
-            new SubscriptionPipeEmptyEvent(),
             new SubscriptionPollResponse(
                 SubscriptionPollResponseType.TABLETS.getType(),
                 payload,
                 new SubscriptionCommitContext(-1, -1, "topic2", "cg1", 0)));
     final SubscriptionEvent event3 =
         new SubscriptionEvent(
-            new SubscriptionPipeEmptyEvent(),
             new SubscriptionPollResponse(
                 SubscriptionPollResponseType.TABLETS.getType(),
                 payload,
                 new SubscriptionCommitContext(-1, -1, "topic3", "cg1", 0)));
     final SubscriptionEvent event4 =
         new SubscriptionEvent(
-            new SubscriptionPipeEmptyEvent(),
             new SubscriptionPollResponse(
                 SubscriptionPollResponseType.TABLETS.getType(),
                 payload,


Reply via email to