This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ca52520f482 [To dev/1.3] Fix pipe drop event discard with 
restart-aware committer keys (#17748) (#17778)
ca52520f482 is described below

commit ca52520f4823397fa00eeba8ea1ecd5c4aba3345
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 28 17:16:14 2026 +0800

    [To dev/1.3] Fix pipe drop event discard with restart-aware committer keys 
(#17748) (#17778)
---
 .../agent/task/connection/PipeEventCollector.java  |  5 ++-
 .../sink/PipeRealtimePriorityBlockingQueue.java    | 10 +++--
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 25 ++++++-----
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |  9 ++--
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  6 ++-
 .../evolvable/batch/PipeTabletEventBatch.java      | 17 ++++++--
 .../batch/PipeTransferBatchReqBuilder.java         | 11 +++--
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  8 +++-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 41 ++++++++---------
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  8 +++-
 .../websocket/WebSocketConnectorServer.java        | 51 ++++++++++------------
 .../sink/protocol/websocket/WebSocketSink.java     |  8 ++++
 .../subtask/SubscriptionSinkSubtaskLifeCycle.java  |  4 +-
 .../subtask/SubscriptionSinkSubtaskManager.java    |  7 ++-
 .../task/subtask/sink/PipeSinkSubtaskTest.java     |  6 ++-
 .../task/connection/BlockingPendingQueue.java      | 39 ++++++++++++-----
 .../task/progress/PipeEventCommitManager.java      |  5 +++
 .../protocol/PipeConnectorWithEventDiscard.java    |  7 +++
 18 files changed, 171 insertions(+), 96 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 95e8196ad38..f0cc4621612 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -202,7 +202,10 @@ public class PipeEventCollector implements EventCollector {
       
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
 
       if (enrichedEvent.getPipeName() != null
-          && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), 
creationTime, regionId)) {
+          && (pendingQueue.isEventFromDroppedPipe(enrichedEvent)
+              || (enrichedEvent.getCommitterKey() == null
+                  && pendingQueue.isPipeDropped(
+                      enrichedEvent.getPipeName(), creationTime, regionId)))) {
         enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
         return;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index f972bba0e6e..e35763c3012 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -360,12 +360,16 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   @Override
   public void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
+    super.discardEventsOfPipe(committerKey);
     tsfileInsertEventDeque.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && isEventFromPipe(
-                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+              && isEventFromPipe((EnrichedEvent) event, committerKey)) {
             if (((EnrichedEvent) event)
                 
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 1e7c50f389e..c855eb57140 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -201,10 +202,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
    * its queued events in the output pipe connector.
    */
-  public void discardEventsOfPipe(
-      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
     // Try to remove the events as much as possible
-    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
+    inputPendingQueue.discardEventsOfPipe(committerKey);
 
     try {
       increaseHighPriorityTaskCount();
@@ -217,9 +217,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // use a new thread to stop all the pipes, we will not encounter 
deadlock here. Or else we
         // will.
         if (lastEvent instanceof EnrichedEvent
-            && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
-            && creationTimeToDrop == ((EnrichedEvent) 
lastEvent).getCreationTime()
-            && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
+            && isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) {
           // Do not clear the last event's reference counts because it may be 
on transferring
           lastEvent = null;
           // Submit self to avoid that the lastEvent has been retried "max 
times" times and has
@@ -241,9 +239,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // clear the lastExceptionEvent. It's safe to potentially clear it 
twice because we have the
         // "nonnull" detection.
         if (lastExceptionEvent instanceof EnrichedEvent
-            && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())
-            && creationTimeToDrop == ((EnrichedEvent) 
lastExceptionEvent).getCreationTime()
-            && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) 
{
+            && isEventFromPipe((EnrichedEvent) lastExceptionEvent, 
committerKey)) {
           clearReferenceCountAndReleaseLastExceptionEvent();
         }
       }
@@ -252,11 +248,18 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     }
 
     if (outputPipeConnector instanceof PipeConnectorWithEventDiscard) {
-      ((PipeConnectorWithEventDiscard) outputPipeConnector)
-          .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+      ((PipeConnectorWithEventDiscard) 
outputPipeConnector).discardEventsOfPipe(committerKey);
     }
   }
 
+  private static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
+  }
+
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public String getAttributeSortedString() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 1780f5a87ef..61a064e1637 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -86,19 +87,17 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
    * Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be 
inconsistent with the
    * {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel 
connector scheduling.
    *
-   * @param pipeNameToDeregister pipe name
-   * @param regionId region id
+   * @param committerKey committer key of the pipe task to deregister
    * @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, 
indicating that the
    *     {@link PipeSinkSubtask} should never be used again
    * @throws IllegalStateException if {@link 
PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
    */
-  public synchronized boolean deregister(
-      final String pipeNameToDeregister, final long creationTimeToDeregister, 
final int regionId) {
+  public synchronized boolean deregister(final CommitterKey committerKey) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
 
-    subtask.discardEventsOfPipe(pipeNameToDeregister, 
creationTimeToDeregister, regionId);
+    subtask.discardEventsOfPipe(committerKey);
 
     try {
       if (registeredTaskCount > 1) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index d7f81c12dbc..817471c785a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -209,7 +210,10 @@ public class PipeSinkSubtaskManager {
     // Shall not be empty
     final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
 
-    lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
+    final CommitterKey committerKey =
+        PipeEventCommitManager.getInstance().getCommitterKey(pipeName, 
creationTime, regionId);
+
+    lifeCycles.removeIf(o -> o.deregister(committerKey));
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index c44e12a4bbf..7058b885750 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -156,11 +157,13 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
    */
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
     events.removeIf(
         event -> {
-          if (pipeNameToDrop.equals(event.getPipeName())
-              && creationTimeToDrop == event.getCreationTime()
-              && regionId == event.getRegionId()) {
+          if (isEventFromPipe(event, committerKey)) {
             
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             return true;
           }
@@ -168,6 +171,14 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
         });
   }
 
+  private static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
+  }
+
   public synchronized void decreaseEventsReferenceCount(
       final String holderMessage, final boolean shouldReport) {
     events.forEach(event -> event.decreaseReferenceCount(holderMessage, 
shouldReport));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index ac5f568f1c6..45264138596 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -197,10 +198,12 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
 
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
-    endPointToBatch
-        .values()
-        .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
+    defaultBatch.discardEventsOfPipe(committerKey);
+    endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(committerKey));
   }
 
   public int size() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 13bcb537ae1..81e745dc6a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink.protocol.airgap;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
 import org.apache.iotdb.commons.utils.RetryUtils;
@@ -546,8 +547,13 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
     if (Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(committerKey);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index fe3d44bedb4..69dcb922e4a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -123,9 +123,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
       new ConcurrentHashMap<>();
 
-  // Pipe name, creation time, region id
-  private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
-      ConcurrentHashMap.newKeySet();
+  private final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   private boolean enableSendTsFileLimit;
   private volatile boolean isConnectionException;
@@ -722,16 +720,20 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
+    droppedPipeTaskKeys.add(committerKey);
 
     if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(committerKey);
     }
     retryEventQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && isDroppedPipe(
-                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+              && isDroppedPipe((EnrichedEvent) event, committerKey)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -742,8 +744,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     retryTsFileQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && isDroppedPipe(
-                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+              && isDroppedPipe((EnrichedEvent) event, committerKey)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -845,18 +846,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   }
 
   private boolean isDroppedPipe(final EnrichedEvent event) {
-    return droppedPipeTaskKeys.contains(
-        new Triple<>(event.getPipeName(), event.getCreationTime(), 
event.getRegionId()));
-  }
-
-  private static boolean isDroppedPipe(
-      final EnrichedEvent event,
-      final String pipeNameToDrop,
-      final long creationTimeToDrop,
-      final int regionId) {
-    return pipeNameToDrop.equals(event.getPipeName())
-        && creationTimeToDrop == event.getCreationTime()
-        && regionId == event.getRegionId();
+    return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, 
key));
+  }
+
+  private static boolean isDroppedPipe(final EnrichedEvent event, final 
CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 552b8cf1cae..ef3d59f0d2a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
 import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
@@ -523,8 +524,13 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
     if (Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(committerKey);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 2a8b5c8c3c0..0ecd1ad6e34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -59,9 +59,7 @@ public class WebSocketConnectorServer extends WebSocketServer 
{
   private final ConcurrentHashMap<String, ConcurrentHashMap<Long, 
EventWaitingForAck>>
       eventsWaitingForAck = new ConcurrentHashMap<>();
 
-  // Pipe name, creation time, region id
-  private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
-      ConcurrentHashMap.newKeySet();
+  private final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   private final BidiMap<String, WebSocket> router =
       new DualTreeBidiMap<String, WebSocket>(null, 
Comparator.comparing(Object::hashCode)) {};
@@ -117,33 +115,33 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           .forEach((eventId, eventWrapper) -> 
discardEvent(eventWrapper.event));
     }
 
-    droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName));
+    droppedPipeTaskKeys.removeIf(key -> key.getPipeName().equals(pipeName));
   }
 
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
+    droppedPipeTaskKeys.add(committerKey);
 
     final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
-        eventsWaitingForTransfer.get(pipeNameToDrop);
+        eventsWaitingForTransfer.get(committerKey.getPipeName());
     if (eventTransferQueue != null) {
       eventTransferQueue.removeIf(
-          eventWrapper ->
-              discardIfMatches(eventWrapper.event, pipeNameToDrop, 
creationTimeToDrop, regionId));
+          eventWrapper -> discardIfMatches(eventWrapper.event, committerKey));
       synchronized (eventTransferQueue) {
         eventTransferQueue.notifyAll();
       }
     }
 
     final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
-        eventsWaitingForAck.get(pipeNameToDrop);
+        eventsWaitingForAck.get(committerKey.getPipeName());
     if (eventId2EventMap != null) {
       eventId2EventMap
           .entrySet()
-          .removeIf(
-              entry ->
-                  discardIfMatches(
-                      entry.getValue().event, pipeNameToDrop, 
creationTimeToDrop, regionId));
+          .removeIf(entry -> discardIfMatches(entry.getValue().event, 
committerKey));
     }
   }
 
@@ -515,19 +513,13 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     }
   }
 
-  private boolean discardIfMatches(
-      final Event event,
-      final String pipeNameToDrop,
-      final long creationTimeToDrop,
-      final int regionId) {
+  private boolean discardIfMatches(final Event event, final CommitterKey 
committerKey) {
     if (!(event instanceof EnrichedEvent)) {
       return false;
     }
 
     final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
-    if (!pipeNameToDrop.equals(enrichedEvent.getPipeName())
-        || creationTimeToDrop != enrichedEvent.getCreationTime()
-        || regionId != enrichedEvent.getRegionId()) {
+    if (!isEventFromPipe(enrichedEvent, committerKey)) {
       return false;
     }
 
@@ -537,11 +529,16 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
 
   private boolean isDroppedPipe(final Event event) {
     return event instanceof EnrichedEvent
-        && droppedPipeTaskKeys.contains(
-            new Triple<>(
-                ((EnrichedEvent) event).getPipeName(),
-                ((EnrichedEvent) event).getCreationTime(),
-                ((EnrichedEvent) event).getRegionId()));
+        && droppedPipeTaskKeys.stream()
+            .anyMatch(key -> isEventFromPipe((EnrichedEvent) event, key));
+  }
+
+  private static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
   }
 
   private boolean isQueueAvailable(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index 40fccc12c99..7841e0199b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
@@ -173,6 +174,13 @@ public class WebSocketSink implements PipeConnector, 
PipeConnectorWithEventDisca
     }
   }
 
+  @Override
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
+    if (server != null) {
+      server.discardEventsOfPipe(committerKey);
+    }
+  }
+
   public void commit(EnrichedEvent enrichedEvent) {
     Optional.ofNullable(enrichedEvent)
         .ifPresent(event -> 
event.decreaseReferenceCount(WebSocketSink.class.getName(), true));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
index af871feaa7e..c24098f44fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.subscription.task.subtask;
 
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
 import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
 import 
org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtaskLifeCycle;
@@ -63,8 +64,7 @@ public class SubscriptionSinkSubtaskLifeCycle extends 
PipeSinkSubtaskLifeCycle {
   }
 
   @Override
-  public synchronized boolean deregister(
-      final String pipeNameToDeregister, final long creationTimeToDeregister, 
final int regionId) {
+  public synchronized boolean deregister(final CommitterKey committerKey) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index f4547673eaa..1c081888b35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.task.subtask;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -167,7 +168,11 @@ public class SubscriptionSinkSubtaskManager {
 
     final PipeSinkSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
-    if (lifeCycle.deregister(pipeName, creationTime, regionId)) {
+
+    final CommitterKey committerKey =
+        PipeEventCommitManager.getInstance().getCommitterKey(pipeName, 
creationTime, regionId);
+
+    if (lifeCycle.deregister(committerKey)) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
index ddfc699721b..2a15fb9ea18 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -51,9 +52,10 @@ public class PipeSinkSubtaskTest {
                 connector));
 
     try {
-      subtask.discardEventsOfPipe("pipe", 1L, 1);
+      final CommitterKey committerKey = new CommitterKey("pipe", 1L, 1, -1);
+      subtask.discardEventsOfPipe(committerKey);
 
-      verify((PipeConnectorWithEventDiscard) 
connector).discardEventsOfPipe("pipe", 1L, 1);
+      verify((PipeConnectorWithEventDiscard) 
connector).discardEventsOfPipe(committerKey);
     } finally {
       subtask.close();
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 8d920121363..c7b91f36d22 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.connection;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -47,9 +47,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   protected final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-  // Pipe name, creation time, region id
-  protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
-      ConcurrentHashMap.newKeySet();
+  protected final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   protected BlockingPendingQueue(
       final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
@@ -138,12 +136,15 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
 
   public void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
+    droppedPipeTaskKeys.add(committerKey);
     pendingQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && isEventFromPipe(
-                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+              && isEventFromPipe((EnrichedEvent) event, committerKey)) {
             if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
             }
@@ -191,16 +192,30 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
         && regionId == event.getRegionId();
   }
 
+  protected static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
+  }
+
   protected boolean isEventFromDroppedPipe(final E event) {
     return event instanceof EnrichedEvent
         && ((EnrichedEvent) event).getPipeName() != null
-        && isPipeDropped(
-            ((EnrichedEvent) event).getPipeName(),
-            ((EnrichedEvent) event).getCreationTime(),
-            ((EnrichedEvent) event).getRegionId());
+        && isEventFromDroppedPipe((EnrichedEvent) event);
+  }
+
+  public boolean isEventFromDroppedPipe(final EnrichedEvent event) {
+    return droppedPipeTaskKeys.stream().anyMatch(key -> isEventFromPipe(event, 
key));
   }
 
   public boolean isPipeDropped(final String pipeName, final long creationTime, 
final int regionId) {
-    return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, 
regionId));
+    return droppedPipeTaskKeys.stream()
+        .anyMatch(
+            key ->
+                key.getPipeName().equals(pipeName)
+                    && key.getCreationTime() == creationTime
+                    && key.getRegionId() == regionId);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index 7056b052a3e..9e1653a2516 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -169,6 +169,11 @@ public class PipeEventCommitManager {
     return true;
   }
 
+  public CommitterKey getCommitterKey(
+      final String pipeName, final long creationTime, final int regionId) {
+    return generateCommitterKey(pipeName, creationTime, regionId);
+  }
+
   private CommitterKey generateCommitterKey(
       final String pipeName, final long creationTime, final int regionId) {
     return taskAgent.getCommitterKey(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
index ab4dbcf9075..4ffc0c25ed2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
@@ -19,7 +19,14 @@
 
 package org.apache.iotdb.commons.pipe.sink.protocol;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+
 public interface PipeConnectorWithEventDiscard {
 
   void discardEventsOfPipe(String pipeName, long creationTime, int regionId);
+
+  default void discardEventsOfPipe(final CommitterKey committerKey) {
+    discardEventsOfPipe(
+        committerKey.getPipeName(), committerKey.getCreationTime(), 
committerKey.getRegionId());
+  }
 }


Reply via email to