This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d5579ad43ccd20d4d6ba18fbee6d6677990616c7
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Aug 2 18:08:29 2024 +0800

    Subscription: avoid incomplete resource release of prefetching queue caused 
by restarting subscription pipe (#13079)
    
    (cherry picked from commit 83f8db0a8b3763dc02b4bcae76f3a9e10439c9c1)
---
 .../agent/SubscriptionBrokerAgent.java             |  15 ++-
 .../agent/SubscriptionConsumerAgent.java           |   4 +-
 .../db/subscription/broker/SubscriptionBroker.java | 102 +++++++++++++++------
 .../broker/SubscriptionPrefetchingQueue.java       |  69 +++++---------
 .../broker/SubscriptionPrefetchingTabletQueue.java |   6 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  24 ++++-
 .../SubscriptionConnectorSubtaskLifeCycle.java     |   2 +-
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |  19 +++-
 8 files changed, 152 insertions(+), 89 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 538004a3ed8..43b16cf2f4c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -158,15 +158,24 @@ public class SubscriptionBrokerAgent {
     broker.bindPrefetchingQueue(subtask.getTopicName(), 
subtask.getInputPendingQueue());
   }
 
-  public void unbindPrefetchingQueue(
-      final String consumerGroupId, final String topicName, final boolean 
doRemove) {
+  public void unbindPrefetchingQueue(final String consumerGroupId, final 
String topicName) {
     final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
     if (Objects.isNull(broker)) {
       LOGGER.warn(
           "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
       return;
     }
-    broker.unbindPrefetchingQueue(topicName, doRemove);
+    broker.unbindPrefetchingQueue(topicName);
+  }
+
+  public void removePrefetchingQueue(final String consumerGroupId, final 
String topicName) {
+    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+    if (Objects.isNull(broker)) {
+      LOGGER.warn(
+          "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
+      return;
+    }
+    broker.removePrefetchingQueue(topicName);
   }
 
   public void executePrefetch(final String consumerGroupId, final String 
topicName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index eee14943df0..075098f8f93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -117,11 +117,11 @@ public class SubscriptionConsumerAgent {
       return;
     }
 
-    // unbind and remove prefetching queue
+    // remove prefetching queue
     final Set<String> topicsUnsubByGroup =
         ConsumerGroupMeta.getTopicsUnsubByGroup(metaInAgent, 
metaFromCoordinator);
     for (final String topicName : topicsUnsubByGroup) {
-      SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, 
topicName, true);
+      SubscriptionAgent.broker().removePrefetchingQueue(consumerGroupId, 
topicName);
     }
 
     // TODO: Currently we fully replace the entire ConsumerGroupMeta without 
carefully checking the
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 43acf12f180..acb4882a4e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -20,13 +20,19 @@
 package org.apache.iotdb.db.subscription.broker;
 
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
 import 
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +44,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
 
 public class SubscriptionBroker {
 
@@ -46,14 +55,22 @@ public class SubscriptionBroker {
   private final String brokerId; // consumer group id
 
   private final Map<String, SubscriptionPrefetchingQueue> 
topicNameToPrefetchingQueue;
+  private final Map<String, String> completedTopicNames;
+
+  // The subscription pipe that was restarted should reuse the previous commit 
ID.
+  private final Map<String, AtomicLong> topicNameToCommitIdGenerator;
 
   public SubscriptionBroker(final String brokerId) {
     this.brokerId = brokerId;
     this.topicNameToPrefetchingQueue = new ConcurrentHashMap<>();
+    this.completedTopicNames = new ConcurrentHashMap<>();
+    this.topicNameToCommitIdGenerator = new ConcurrentHashMap<>();
   }
 
   public boolean isEmpty() {
-    return topicNameToPrefetchingQueue.isEmpty();
+    return topicNameToPrefetchingQueue.isEmpty()
+        && completedTopicNames.isEmpty()
+        && topicNameToCommitIdGenerator.isEmpty();
   }
 
   //////////////////////////// provided for SubscriptionBrokerAgent 
////////////////////////////
@@ -64,21 +81,32 @@ public class SubscriptionBroker {
       final SubscriptionPrefetchingQueue prefetchingQueue =
           topicNameToPrefetchingQueue.get(topicName);
       if (Objects.isNull(prefetchingQueue)) {
+        // check if completed
+        if (completedTopicNames.containsKey(topicName)) {
+          LOGGER.info(
+              "Subscription: prefetching queue bound to topic [{}] for 
consumer group [{}] is completed, return termination response to client",
+              topicName,
+              brokerId);
+          events.add(
+              new SubscriptionEvent(
+                  new SubscriptionPipeEmptyEvent(),
+                  new SubscriptionPollResponse(
+                      SubscriptionPollResponseType.TERMINATION.getType(),
+                      new TerminationPayload(),
+                      new SubscriptionCommitContext(
+                          
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+                          PipeDataNodeAgent.runtime().getRebootTimes(),
+                          topicName,
+                          brokerId,
+                          INVALID_COMMIT_ID))));
+          continue;
+        }
         // There are two reasons for not printing logs here:
         // 1. There will be a delay in the creation of the prefetching queue 
after subscription.
         // 2. There is no corresponding prefetching queue on this DN 
(currently the consumer is
         // fully connected to all DNs).
         continue;
       }
-      // check if completed before closed
-      if (prefetchingQueue.isCompleted()) {
-        LOGGER.info(
-            "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is completed, return termination response to client",
-            topicName,
-            brokerId);
-        
events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse());
-        continue;
-      }
       if (prefetchingQueue.isClosed()) {
         LOGGER.warn(
             "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is closed",
@@ -186,11 +214,17 @@ public class SubscriptionBroker {
     if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) {
       prefetchingQueue =
           new SubscriptionPrefetchingTsFileQueue(
-              brokerId, topicName, new 
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
+              brokerId,
+              topicName,
+              new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue),
+              topicNameToCommitIdGenerator.computeIfAbsent(topicName, (key) -> 
new AtomicLong()));
     } else {
       prefetchingQueue =
           new SubscriptionPrefetchingTabletQueue(
-              brokerId, topicName, new 
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
+              brokerId,
+              topicName,
+              new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue),
+              topicNameToCommitIdGenerator.computeIfAbsent(topicName, (key) -> 
new AtomicLong()));
     }
     
SubscriptionPrefetchingQueueMetrics.getInstance().register(prefetchingQueue);
     topicNameToPrefetchingQueue.put(topicName, prefetchingQueue);
@@ -200,7 +234,7 @@ public class SubscriptionBroker {
         brokerId);
   }
 
-  public void unbindPrefetchingQueue(final String topicName, final boolean 
doRemove) {
+  public void unbindPrefetchingQueue(final String topicName) {
     final SubscriptionPrefetchingQueue prefetchingQueue =
         topicNameToPrefetchingQueue.get(topicName);
     if (Objects.isNull(prefetchingQueue)) {
@@ -214,28 +248,40 @@ public class SubscriptionBroker {
     // mark prefetching queue closed first
     prefetchingQueue.markClosed();
 
-    // mark prefetching queue completed only for topic of snapshot mode
-    if (SubscriptionAgent.topic()
-        .getTopicMode(topicName)
-        .equals(TopicConstant.MODE_SNAPSHOT_VALUE)) {
-      prefetchingQueue.markCompleted();
+    // mark topic name completed only for topic of snapshot mode
+    if 
(SubscriptionAgent.topic().getTopicMode(topicName).equals(TopicConstant.MODE_SNAPSHOT_VALUE)
+        && prefetchingQueue.isCompleted()) {
+      completedTopicNames.put(topicName, topicName);
     }
 
-    if (doRemove) {
-      // clean up events in prefetching queue
-      prefetchingQueue.cleanup();
+    // clean up events in prefetching queue
+    prefetchingQueue.cleanup();
 
-      // deregister metrics
-      SubscriptionPrefetchingQueueMetrics.getInstance()
-          .deregister(prefetchingQueue.getPrefetchingQueueId());
+    // deregister metrics
+    SubscriptionPrefetchingQueueMetrics.getInstance()
+        .deregister(prefetchingQueue.getPrefetchingQueueId());
 
-      // remove prefetching queue
-      topicNameToPrefetchingQueue.remove(topicName);
-      LOGGER.info(
-          "Subscription: drop prefetching queue bound to topic [{}] for 
consumer group [{}]",
+    // remove prefetching queue
+    topicNameToPrefetchingQueue.remove(topicName);
+    LOGGER.info(
+        "Subscription: drop prefetching queue bound to topic [{}] for consumer 
group [{}]",
+        topicName,
+        brokerId);
+  }
+
+  public void removePrefetchingQueue(final String topicName) {
+    final SubscriptionPrefetchingQueue prefetchingQueue =
+        topicNameToPrefetchingQueue.get(topicName);
+    if (Objects.nonNull(prefetchingQueue)) {
+      LOGGER.warn(
+          "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] still exists",
           topicName,
           brokerId);
+      return;
     }
+
+    completedTopicNames.remove(topicName);
+    topicNameToCommitIdGenerator.remove(topicName);
   }
 
   public void executePrefetch(final String topicName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 749bcf13530..d4e441eb0fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -27,14 +27,9 @@ import 
org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,19 +42,18 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
-
 public abstract class SubscriptionPrefetchingQueue {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionPrefetchingQueue.class);
 
   protected final String brokerId; // consumer group id
   protected final String topicName;
+
   protected final SubscriptionBlockingPendingQueue inputPendingQueue;
   protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue;
-
   protected final Map<SubscriptionCommitContext, SubscriptionEvent> 
uncommittedEvents;
-  private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0);
+
+  private final AtomicLong commitIdGenerator;
 
   private volatile boolean isCompleted = false;
   private volatile boolean isClosed = false;
@@ -67,13 +61,15 @@ public abstract class SubscriptionPrefetchingQueue {
   public SubscriptionPrefetchingQueue(
       final String brokerId,
       final String topicName,
-      final SubscriptionBlockingPendingQueue inputPendingQueue) {
+      final SubscriptionBlockingPendingQueue inputPendingQueue,
+      final AtomicLong commitIdGenerator) {
     this.brokerId = brokerId;
     this.topicName = topicName;
     this.inputPendingQueue = inputPendingQueue;
 
     this.prefetchingQueue = new LinkedBlockingQueue<>();
     this.uncommittedEvents = new ConcurrentHashMap<>();
+    this.commitIdGenerator = commitIdGenerator;
   }
 
   public void cleanup() {
@@ -170,13 +166,20 @@ public abstract class SubscriptionPrefetchingQueue {
       }
 
       if (event instanceof PipeTerminateEvent) {
-        LOGGER.info(
-            "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
-            this,
-            event);
+        final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
+        // add mark completed hook
+        terminateEvent.addOnCommittedHook(
+            () -> {
+              markCompleted();
+              return null;
+            });
         // commit directly
         ((PipeTerminateEvent) event)
             
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
+        LOGGER.info(
+            "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
+            this,
+            terminateEvent);
         continue;
       }
 
@@ -318,16 +321,7 @@ public abstract class SubscriptionPrefetchingQueue {
         PipeDataNodeAgent.runtime().getRebootTimes(),
         topicName,
         brokerId,
-        subscriptionCommitIdGenerator.getAndIncrement());
-  }
-
-  private SubscriptionCommitContext generateInvalidSubscriptionCommitContext() 
{
-    return new SubscriptionCommitContext(
-        IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
-        PipeDataNodeAgent.runtime().getRebootTimes(),
-        topicName,
-        brokerId,
-        INVALID_COMMIT_ID);
+        commitIdGenerator.getAndIncrement());
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
@@ -346,10 +340,10 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   public long getCurrentCommitId() {
-    return subscriptionCommitIdGenerator.get();
+    return commitIdGenerator.get();
   }
 
-  /////////////////////////////// termination ///////////////////////////////
+  /////////////////////////////// close & termination 
///////////////////////////////
 
   public boolean isClosed() {
     return isClosed;
@@ -367,25 +361,6 @@ public abstract class SubscriptionPrefetchingQueue {
     isCompleted = true;
   }
 
-  public SubscriptionEvent generateSubscriptionPollTerminationResponse() {
-    return new SubscriptionEvent(
-        new SubscriptionPipeEmptyEvent(),
-        new SubscriptionPollResponse(
-            SubscriptionPollResponseType.TERMINATION.getType(),
-            new TerminationPayload(),
-            generateInvalidSubscriptionCommitContext()));
-  }
-
-  public SubscriptionEvent generateSubscriptionPollErrorResponse(
-      final String errorMessage, final boolean critical) {
-    return new SubscriptionEvent(
-        new SubscriptionPipeEmptyEvent(),
-        new SubscriptionPollResponse(
-            SubscriptionPollResponseType.ERROR.getType(),
-            new ErrorPayload(errorMessage, critical),
-            generateInvalidSubscriptionCommitContext()));
-  }
-
   /////////////////////////////// stringify ///////////////////////////////
 
   protected Map<String, String> coreReportMessage() {
@@ -393,7 +368,7 @@ public abstract class SubscriptionPrefetchingQueue {
     result.put("brokerId", brokerId);
     result.put("topicName", topicName);
     result.put("size of uncommittedEvents", 
String.valueOf(uncommittedEvents.size()));
-    result.put("subscriptionCommitIdGenerator", 
subscriptionCommitIdGenerator.toString());
+    result.put("commitIdGenerator", commitIdGenerator.toString());
     result.put("isCompleted", String.valueOf(isCompleted));
     result.put("isClosed", String.valueOf(isClosed));
     return result;
@@ -406,7 +381,7 @@ public abstract class SubscriptionPrefetchingQueue {
     result.put("size of inputPendingQueue", 
String.valueOf(inputPendingQueue.size()));
     result.put("size of prefetchingQueue", 
String.valueOf(prefetchingQueue.size()));
     result.put("uncommittedEvents", uncommittedEvents.toString());
-    result.put("subscriptionCommitIdGenerator", 
subscriptionCommitIdGenerator.toString());
+    result.put("commitIdGenerator", commitIdGenerator.toString());
     result.put("isCompleted", String.valueOf(isCompleted));
     result.put("isClosed", String.valueOf(isClosed));
     return result;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 1c327d2671d..47c57cb1897 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQueue {
@@ -53,8 +54,9 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
   public SubscriptionPrefetchingTabletQueue(
       final String brokerId,
       final String topicName,
-      final SubscriptionBlockingPendingQueue inputPendingQueue) {
-    super(brokerId, topicName, inputPendingQueue);
+      final SubscriptionBlockingPendingQueue inputPendingQueue,
+      final AtomicLong commitIdGenerator) {
+    super(brokerId, topicName, inputPendingQueue, commitIdGenerator);
 
     this.currentBatchRef.set(
         new SubscriptionPipeTabletEventBatch(this, BATCH_MAX_DELAY_IN_MS, 
BATCH_MAX_SIZE_IN_BYTES));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index f0604eca157..d8ccc8fed69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -20,11 +20,15 @@
 package org.apache.iotdb.db.subscription.broker;
 
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
+import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
@@ -46,8 +50,11 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
+
 public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
 
   private static final Logger LOGGER =
@@ -66,8 +73,9 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
   public SubscriptionPrefetchingTsFileQueue(
       final String brokerId,
       final String topicName,
-      final SubscriptionBlockingPendingQueue inputPendingQueue) {
-    super(brokerId, topicName, inputPendingQueue);
+      final SubscriptionBlockingPendingQueue inputPendingQueue,
+      final AtomicLong commitIdGenerator) {
+    super(brokerId, topicName, inputPendingQueue, commitIdGenerator);
 
     this.consumerIdToSubscriptionEventMap = new ConcurrentHashMap<>();
     this.currentBatchRef.set(
@@ -429,7 +437,17 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
 
   private SubscriptionEvent generateSubscriptionPollErrorResponse(final String 
errorMessage) {
     // consider non-critical by default, meaning the client can retry
-    return super.generateSubscriptionPollErrorResponse(errorMessage, false);
+    return new SubscriptionEvent(
+        new SubscriptionPipeEmptyEvent(),
+        new SubscriptionPollResponse(
+            SubscriptionPollResponseType.ERROR.getType(),
+            new ErrorPayload(errorMessage, false),
+            new SubscriptionCommitContext(
+                IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+                PipeDataNodeAgent.runtime().getRebootTimes(),
+                topicName,
+                brokerId,
+                INVALID_COMMIT_ID)));
   }
 
   /////////////////////////////// stringify ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 41e690cbce9..c05bd13f07c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -103,6 +103,6 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
     // when dropping the subscription.
     final String consumerGroupId = ((SubscriptionConnectorSubtask) 
subtask).getConsumerGroupId();
     final String topicName = ((SubscriptionConnectorSubtask) 
subtask).getTopicName();
-    SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, 
topicName, false);
+    SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, 
topicName);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 6d44d26c332..cbdbef1e4e9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -29,9 +29,12 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 /**
  * {@link EnrichedEvent} is an {@link Event} that can be enriched with 
additional runtime
@@ -65,6 +68,7 @@ public abstract class EnrichedEvent implements Event {
   protected boolean isTimeParsed;
 
   protected boolean shouldReportOnCommit = true;
+  protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>();
 
   protected EnrichedEvent(
       final String pipeName,
@@ -83,6 +87,13 @@ public abstract class EnrichedEvent implements Event {
     this.endTime = endTime;
     isPatternParsed = this.pipePattern == null || this.pipePattern.isRoot();
     isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
+    addOnCommittedHook(
+        () -> {
+          if (shouldReportOnCommit) {
+            reportProgress();
+          }
+          return null;
+        });
   }
 
   /**
@@ -346,9 +357,11 @@ public abstract class EnrichedEvent implements Event {
   }
 
   public void onCommitted() {
-    if (shouldReportOnCommit) {
-      reportProgress();
-    }
+    onCommittedHooks.forEach(Supplier::get);
+  }
+
+  public void addOnCommittedHook(final Supplier<Void> hook) {
+    onCommittedHooks.add(hook);
   }
 
   public boolean isReleased() {

Reply via email to