This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cb03ad80653 Subscription: fix consumer infinite pulling event & fully 
managed tsfile parsing process & increase the reference count for subscribed 
parsed raw tablet event & disrupt parsing requests through the introduction of 
randomness & disable prefetch by default (#14856)
cb03ad80653 is described below

commit cb03ad80653a38d0c5e6c3bc667fdbe1080db7da
Author: VGalaxies <[email protected]>
AuthorDate: Tue Feb 18 10:39:30 2025 +0800

    Subscription: fix consumer infinite pulling event & fully managed tsfile 
parsing process & increase the reference count for subscribed parsed raw tablet 
event & disrupt parsing requests through the introduction of randomness & 
disable prefetch by default (#14856)
---
 .../iotdb/rpc/subscription/config/TopicConfig.java |  7 ++-
 .../subscription/payload/poll/TabletsPayload.java  |  4 +-
 .../base/AbstractSubscriptionConsumer.java         |  2 +-
 .../task/builder/PipeDataNodeTaskBuilder.java      | 12 ++++-
 .../agent/task/connection/PipeEventCollector.java  | 11 +++-
 .../agent/task/stage/PipeTaskProcessorStage.java   |  9 +++-
 .../broker/SubscriptionPrefetchingQueue.java       |  3 +-
 .../broker/SubscriptionPrefetchingTabletQueue.java |  2 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  2 +-
 .../batch/SubscriptionPipeTabletEventBatch.java    | 48 ++++++++++++-----
 .../SubscriptionPipeTabletIterationSnapshot.java   | 62 ++++++++++++++++++++++
 .../pipe/SubscriptionPipeTabletBatchEvents.java    | 29 +++++-----
 .../apache/iotdb/commons/conf/CommonConfig.java    |  2 +-
 13 files changed, 153 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index c03fbf3147c..f71980d629f 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -194,7 +194,12 @@ public class TopicConfig extends PipeParameters {
   /////////////////////////////// connector attributes mapping 
///////////////////////////////
 
   public Map<String, String> getAttributesWithSinkFormat() {
-    return SINK_HYBRID_FORMAT_CONFIG; // default to hybrid
+    // refer to
+    // 
org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector.parseAndCollectEvent(org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent)
+    return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(
+            attributes.getOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE))
+        ? SINK_TS_FILE_FORMAT_CONFIG
+        : SINK_TABLET_FORMAT_CONFIG;
   }
 
   public Map<String, String> getAttributesWithSinkPrefix() {
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
index 23b9e066329..5654572f458 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
@@ -40,8 +40,8 @@ public class TabletsPayload implements 
SubscriptionPollPayload {
    * <ul>
    *   <li>If nextOffset is 1, it indicates that the current payload is the 
first payload (its
    *       tablets are empty) and the fetching should continue.
-   *   <li>If nextOffset is negative, it indicates all tablets have been 
fetched, and -nextOffset
-   *       represents the total number of tablets.
+   *   <li>If nextOffset is negative (or zero), it indicates all tablets have 
been fetched, and
+   *       -nextOffset represents the total number of tablets.
    * </ul>
    */
   private transient int nextOffset;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index 7dfae59a935..1755c0cfd48 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -935,7 +935,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
 
     int nextOffset = ((TabletsPayload) 
initialResponse.getPayload()).getNextOffset();
     while (true) {
-      if (nextOffset < 0) {
+      if (nextOffset <= 0) {
         if (!Objects.equals(tablets.size(), -nextOffset)) {
           final String errorMessage =
               String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 2bfc343fda7..1b517419c98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -49,6 +49,7 @@ import java.util.Map;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
 
 public class PipeDataNodeTaskBuilder {
@@ -139,7 +140,16 @@ public class PipeDataNodeTaskBuilder {
                 .getStringOrDefault(
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
-                .equals(CONNECTOR_FORMAT_TABLET_VALUE));
+                .equals(CONNECTOR_FORMAT_TABLET_VALUE),
+            PipeType.SUBSCRIPTION.equals(pipeType)
+                &&
+                // should not skip parsing when the format is tsfile
+                !pipeStaticMeta
+                    .getConnectorParameters()
+                    .getStringOrDefault(
+                        Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
+                        CONNECTOR_FORMAT_HYBRID_VALUE)
+                    .equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
 
     return new PipeDataNodeTask(
         pipeStaticMeta.getPipeName(), regionId, extractorStage, 
processorStage, connectorStage);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index d63c1e39325..d8bd7fdce2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -55,6 +55,8 @@ public class PipeEventCollector implements EventCollector {
 
   private final boolean forceTabletFormat;
 
+  private final boolean skipParseTsFile;
+
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
   private boolean hasNoGeneratedEvent = true;
   private boolean isFailedToIncreaseReferenceCount = false;
@@ -63,11 +65,13 @@ public class PipeEventCollector implements EventCollector {
       final UnboundedBlockingPendingQueue<Event> pendingQueue,
       final long creationTime,
       final int regionId,
-      final boolean forceTabletFormat) {
+      final boolean forceTabletFormat,
+      final boolean skipParseTsFile) {
     this.pendingQueue = pendingQueue;
     this.creationTime = creationTime;
     this.regionId = regionId;
     this.forceTabletFormat = forceTabletFormat;
+    this.skipParseTsFile = skipParseTsFile;
   }
 
   @Override
@@ -117,6 +121,11 @@ public class PipeEventCollector implements EventCollector {
       return;
     }
 
+    if (skipParseTsFile) {
+      collectEvent(sourceEvent);
+      return;
+    }
+
     if (!forceTabletFormat
         && (!sourceEvent.shouldParseTimeOrPattern()
             || (sourceEvent.isTableModelEvent()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 22cd5ce3094..5201a19d8bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -68,7 +68,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       final UnboundedBlockingPendingQueue<Event> 
pipeConnectorOutputPendingQueue,
       final PipeProcessorSubtaskExecutor executor,
       final PipeTaskMeta pipeTaskMeta,
-      final boolean forceTabletFormat) {
+      final boolean forceTabletFormat,
+      final boolean skipParseTsFile) {
     final PipeProcessorRuntimeConfiguration runtimeConfiguration =
         new PipeTaskRuntimeConfiguration(
             new PipeTaskProcessorRuntimeEnvironment(
@@ -100,7 +101,11 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     final String taskId = pipeName + "_" + regionId + "_" + creationTime;
     final PipeEventCollector pipeConnectorOutputEventCollector =
         new PipeEventCollector(
-            pipeConnectorOutputPendingQueue, creationTime, regionId, 
forceTabletFormat);
+            pipeConnectorOutputPendingQueue,
+            creationTime,
+            regionId,
+            forceTabletFormat,
+            skipParseTsFile);
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index ca9bf096efa..f0d6d752398 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -197,7 +197,8 @@ public abstract class SubscriptionPrefetchingQueue {
               return null;
             },
             SubscriptionAgent.receiver().remainingMs());
-      } catch (final Exception ignored) {
+      } catch (final Exception e) {
+        LOGGER.warn("Exception {} occurred when {} execute receiver subtask", 
this, e, e);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index d590df17c2b..41e4b060ae5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -159,7 +159,7 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
                 String.format(
                     "exception occurred when fetching next response: %s, 
consumer id: %s, commit context: %s, offset: %s, prefetching queue: %s",
                     e, consumerId, commitContext, offset, this);
-            LOGGER.warn(errorMessage);
+            LOGGER.warn(errorMessage, e);
             eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
           }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index adef86518b8..978ed9fac46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -211,7 +211,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
                 String.format(
                     "exception occurred when fetching next response: %s, 
consumer id: %s, commit context: %s, writing offset: %s, prefetching queue: %s",
                     e, consumerId, commitContext, writingOffset, this);
-            LOGGER.warn(errorMessage);
+            LOGGER.warn(errorMessage, e);
             eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
           }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 00114d85df4..40fb4fe3751 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -37,7 +37,6 @@ import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -61,7 +60,7 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   private final Meter insertNodeTabletInsertionEventSizeEstimator;
   private final Meter rawTabletInsertionEventSizeEstimator;
 
-  private volatile List<EnrichedEvent> iteratedEnrichedEvents;
+  private volatile SubscriptionPipeTabletIterationSnapshot iterationSnapshot;
   private final AtomicInteger referenceCount = new AtomicInteger();
 
   private static final long ITERATED_COUNT_REPORT_FREQ =
@@ -88,7 +87,17 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   @Override
   public synchronized void ack() {
     referenceCount.decrementAndGet();
-    // do nothing for iterated enriched events, see 
SubscriptionPipeTabletBatchEvents
+
+    // we decrease the reference count of events if and only if when the whole 
batch is consumed
+    if (!hasNext() && referenceCount.get() == 0) {
+      for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+        if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+          // close data container in tsfile event
+          ((PipeTsFileInsertionEvent) enrichedEvent).close();
+        }
+        enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+      }
+    }
   }
 
   @Override
@@ -200,26 +209,29 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
 
   /////////////////////////////// iterator ///////////////////////////////
 
-  public List<EnrichedEvent> sendIterationSnapshot() {
-    final List<EnrichedEvent> result = 
Collections.unmodifiableList(iteratedEnrichedEvents);
-    iteratedEnrichedEvents = new ArrayList<>();
+  public synchronized SubscriptionPipeTabletIterationSnapshot 
sendIterationSnapshot() {
+    final SubscriptionPipeTabletIterationSnapshot result = iterationSnapshot;
+    iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
     referenceCount.incrementAndGet();
     return result;
   }
 
-  public void resetForIteration() {
+  public synchronized void resetForIteration() {
     currentEnrichedEventsIterator = enrichedEvents.iterator();
     currentTabletInsertionEventsIterator = null;
     currentTsFileInsertionEvent = null;
 
-    iteratedEnrichedEvents = new ArrayList<>();
+    if (Objects.nonNull(iterationSnapshot)) {
+      iterationSnapshot.clear();
+    }
+    iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
     referenceCount.set(0);
 
     iteratedCount.set(0);
   }
 
   @Override
-  public boolean hasNext() {
+  public synchronized boolean hasNext() {
     if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
       if (currentTabletInsertionEventsIterator.hasNext()) {
         return true;
@@ -245,7 +257,7 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   }
 
   @Override
-  public List<Tablet> next() {
+  public synchronized List<Tablet> next() {
     final List<Tablet> tablets = nextInternal();
     if (Objects.isNull(tablets)) {
       return null;
@@ -267,8 +279,16 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
       if (currentTabletInsertionEventsIterator.hasNext()) {
         final TabletInsertionEvent tabletInsertionEvent =
             currentTabletInsertionEventsIterator.next();
+        if (!((EnrichedEvent) tabletInsertionEvent)
+            .increaseReferenceCount(this.getClass().getName())) {
+          LOGGER.warn(
+              "SubscriptionPipeTabletEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
+              ((EnrichedEvent) tabletInsertionEvent).coreReportMessage());
+        } else {
+          iterationSnapshot.addParsedEnrichedEvent((EnrichedEvent) 
tabletInsertionEvent);
+        }
         if (!currentTabletInsertionEventsIterator.hasNext()) {
-          iteratedEnrichedEvents.add((EnrichedEvent) 
currentTsFileInsertionEvent);
+          iterationSnapshot.addIteratedEnrichedEvent((EnrichedEvent) 
currentTsFileInsertionEvent);
         }
         return convertToTablets(tabletInsertionEvent);
       } else {
@@ -297,11 +317,13 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
       currentTsFileInsertionEvent = tsFileInsertionEvent;
       currentTabletInsertionEventsIterator =
           tsFileInsertionEvent
-              
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())
+              .toTabletInsertionEvents(
+                  // disrupt parsing requests through the introduction of 
randomness
+                  (long) ((1 + Math.random()) * 
SubscriptionAgent.receiver().remainingMs()))
               .iterator();
       return next();
     } else if (enrichedEvent instanceof TabletInsertionEvent) {
-      iteratedEnrichedEvents.add(enrichedEvent);
+      iterationSnapshot.addIteratedEnrichedEvent(enrichedEvent);
       return convertToTablets((TabletInsertionEvent) enrichedEvent);
     } else {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
new file mode 100644
index 00000000000..137354ae36b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.batch;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class SubscriptionPipeTabletIterationSnapshot {
+
+  private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
+  private final List<EnrichedEvent> parsedEnrichedEvents = new ArrayList<>();
+
+  public List<EnrichedEvent> getIteratedEnrichedEvents() {
+    return Collections.unmodifiableList(iteratedEnrichedEvents);
+  }
+
+  public void addIteratedEnrichedEvent(final EnrichedEvent enrichedEvent) {
+    iteratedEnrichedEvents.add(enrichedEvent);
+  }
+
+  public void addParsedEnrichedEvent(final EnrichedEvent enrichedEvent) {
+    parsedEnrichedEvents.add(enrichedEvent);
+  }
+
+  public void clear() {
+    for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+      if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+        // close data container in tsfile event
+        ((PipeTsFileInsertionEvent) enrichedEvent).close();
+      }
+    }
+
+    for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
+      if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
+        // decrease reference count in raw tablet event
+        enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index 87a69fb2be2..9bf87c6b1be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -20,8 +20,8 @@
 package org.apache.iotdb.db.subscription.event.pipe;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
+import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletIterationSnapshot;
 
 import java.util.List;
 import java.util.Objects;
@@ -32,28 +32,21 @@ import static 
com.google.common.base.MoreObjects.toStringHelper;
 public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents {
 
   private final SubscriptionPipeTabletEventBatch batch;
-  private volatile List<EnrichedEvent> iteratedEnrichedEvents;
+  private volatile SubscriptionPipeTabletIterationSnapshot iterationSnapshot;
 
   public SubscriptionPipeTabletBatchEvents(final 
SubscriptionPipeTabletEventBatch batch) {
     this.batch = batch;
   }
 
-  public void receiveIterationSnapshot(final List<EnrichedEvent> 
iteratedEnrichedEvents) {
-    this.iteratedEnrichedEvents = iteratedEnrichedEvents;
+  public void receiveIterationSnapshot(
+      final SubscriptionPipeTabletIterationSnapshot iterationSnapshot) {
+    this.iterationSnapshot = iterationSnapshot;
   }
 
   @Override
   public void ack() {
     batch.ack();
-
-    // only decrease the reference count of iterated events
-    for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
-      if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
-        // close data container in tsfile event
-        ((PipeTsFileInsertionEvent) enrichedEvent).close();
-      }
-      enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
-    }
+    iterationSnapshot.clear();
   }
 
   @Override
@@ -67,7 +60,11 @@ public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents
   public String toString() {
     return toStringHelper(this)
         .add("batch", batch)
-        .add("events", formatEnrichedEvents(iteratedEnrichedEvents, 4))
+        .add(
+            "events",
+            Objects.nonNull(iterationSnapshot)
+                ? 
formatEnrichedEvents(iterationSnapshot.getIteratedEnrichedEvents(), 4)
+                : "<unknown>")
         .toString();
   }
 
@@ -92,6 +89,8 @@ public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents
 
   @Override
   public int getPipeEventCount() {
-    return iteratedEnrichedEvents.size();
+    return Objects.nonNull(iterationSnapshot)
+        ? iterationSnapshot.getIteratedEnrichedEvents().size()
+        : 0;
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index f0212486f52..f18ef082e00 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -309,7 +309,7 @@ public class CommonConfig {
   private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
   private volatile long subscriptionCheckMemoryEnoughIntervalMs = 10L;
 
-  private boolean subscriptionPrefetchEnabled = true;
+  private boolean subscriptionPrefetchEnabled = false;
   private float subscriptionPrefetchMemoryThreshold = 0.5F;
   private float subscriptionPrefetchMissingRateThreshold = 0.9F;
   private int subscriptionPrefetchEventLocalCountThreshold = 10;

Reply via email to