This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch Fix-drop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3f4ddd2febd37e6e5a58e9f28f6b5ab9cbce7d5e
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 22 16:43:17 2026 +0800

    Fix
---
 .../agent/task/connection/PipeEventCollector.java  |  2 +-
 .../sink/PipeRealtimePriorityBlockingQueue.java    | 11 +++--
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 26 ++++++-----
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |  9 ++--
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  6 ++-
 .../evolvable/batch/PipeTabletEventBatch.java      | 18 ++++++--
 .../batch/PipeTransferBatchReqBuilder.java         | 11 +++--
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  8 +++-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 42 +++++++++--------
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  8 +++-
 .../websocket/WebSocketConnectorServer.java        | 52 +++++++++++-----------
 .../sink/protocol/websocket/WebSocketSink.java     |  8 ++++
 .../task/connection/BlockingPendingQueue.java      | 41 +++++++++++------
 .../task/progress/PipeEventCommitManager.java      |  5 +++
 .../protocol/PipeConnectorWithEventDiscard.java    |  7 +++
 15 files changed, 161 insertions(+), 93 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index ad44b78042a..c46d4a71343 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -238,7 +238,7 @@ public class PipeEventCollector implements EventCollector {
       
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
 
       if (enrichedEvent.getPipeName() != null
-          && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), 
creationTime, regionId)) {
+          && pendingQueue.isEventFromDroppedPipe(enrichedEvent)) {
         enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
         return;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 4b65746b3ab..bea5fcfe854 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -356,12 +356,15 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   @Override
   public void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
+    super.discardEventsOfPipe(committerKey);
     tsfileInsertEventDeque.removeIf(
         event -> {
-          if (event instanceof EnrichedEvent
-              && isEventFromPipe(
-                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+          if (event instanceof EnrichedEvent && 
isEventFromPipe((EnrichedEvent) event, committerKey)) {
             if (((EnrichedEvent) event)
                 
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 560512521e7..b633efe3129 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -201,10 +202,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
    * its queued events in the output pipe connector.
    */
-  public void discardEventsOfPipe(
-      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
     // Try to remove the events as much as possible
-    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
+    inputPendingQueue.discardEventsOfPipe(committerKey);
 
     try {
       increaseHighPriorityTaskCount();
@@ -217,9 +217,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // use a new thread to stop all the pipes, we will not encounter 
deadlock here. Or else we
         // will.
         if (lastEvent instanceof EnrichedEvent
-            && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
-            && creationTimeToDrop == ((EnrichedEvent) 
lastEvent).getCreationTime()
-            && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
+            && isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) {
           // Do not clear the last event's reference counts because it may be 
on transferring
           lastEvent = null;
           // Submit self to avoid that the lastEvent has been retried "max 
times" times and has
@@ -241,9 +239,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // clear the lastExceptionEvent. It's safe to potentially clear it 
twice because we have the
         // "nonnull" detection.
         if (lastExceptionEvent instanceof EnrichedEvent
-            && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())
-            && creationTimeToDrop == ((EnrichedEvent) 
lastExceptionEvent).getCreationTime()
-            && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) 
{
+            && isEventFromPipe((EnrichedEvent) lastExceptionEvent, 
committerKey)) {
           clearReferenceCountAndReleaseLastExceptionEvent();
         }
       }
@@ -252,11 +248,19 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     }
 
     if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
-      ((PipeConnectorWithEventDiscard) outputPipeSink)
-          .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+      ((PipeConnectorWithEventDiscard) 
outputPipeSink).discardEventsOfPipe(committerKey);
     }
   }
 
+  private static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0
+            || committerKey.equals(event.getCommitterKey()));
+  }
+
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public String getAttributeSortedString() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 85634277627..42b1ae91366 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -87,19 +88,17 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
    * Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be 
inconsistent with the
    * {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel 
connector scheduling.
    *
-   * @param pipeNameToDeregister pipe name
-   * @param regionId region id
+   * @param committerKey committer key of the pipe task to deregister
    * @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, 
indicating that the
    *     {@link PipeSinkSubtask} should never be used again
    * @throws IllegalStateException if {@link 
PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
    */
-  public synchronized boolean deregister(
-      final String pipeNameToDeregister, final long creationTimeToDeregister, 
final int regionId) {
+  public synchronized boolean deregister(final CommitterKey committerKey) {
     if (registeredTaskCount <= 0) {
       throw new 
IllegalStateException(DataNodePipeMessages.REGISTEREDTASKCOUNT_0_1);
     }
 
-    subtask.discardEventsOfPipe(pipeNameToDeregister, 
creationTimeToDeregister, regionId);
+    subtask.discardEventsOfPipe(committerKey);
 
     try {
       if (registeredTaskCount > 1) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 367b9210406..3ad99ca5c06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -211,7 +212,10 @@ public class PipeSinkSubtaskManager {
     // Shall not be empty
     final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
 
-    lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
+    final CommitterKey committerKey =
+        PipeEventCommitManager.getInstance().getCommitterKey(pipeName, 
creationTime, regionId);
+
+    lifeCycles.removeIf(o -> o.deregister(committerKey));
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 8bf69e6e6b0..4e2189dd5a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -157,11 +158,13 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
    */
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
     events.removeIf(
         event -> {
-          if (pipeNameToDrop.equals(event.getPipeName())
-              && creationTimeToDrop == event.getCreationTime()
-              && regionId == event.getRegionId()) {
+          if (isEventFromPipe(event, committerKey)) {
             
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             return true;
           }
@@ -169,6 +172,15 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
         });
   }
 
+  private static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0
+            || committerKey.equals(event.getCommitterKey()));
+  }
+
   public synchronized void decreaseEventsReferenceCount(
       final String holderMessage, final boolean shouldReport) {
     events.forEach(event -> event.decreaseReferenceCount(holderMessage, 
shouldReport));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 3bec537614c..b3a8884a146 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -201,10 +202,12 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
 
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
-    endPointToBatch
-        .values()
-        .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
+    defaultBatch.discardEventsOfPipe(committerKey);
+    endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(committerKey));
   }
 
   public int size() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 649ef35c4ce..ea83524988b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink.protocol.airgap;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
 import org.apache.iotdb.commons.utils.RetryUtils;
@@ -613,8 +614,13 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
     if (Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(committerKey);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 9adbcf6cf16..b542627f942 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -130,9 +130,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
       new ConcurrentHashMap<>();
 
-  // Pipe name, creation time, region id
-  private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
-      ConcurrentHashMap.newKeySet();
+  private final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   private boolean enableSendTsFileLimit;
   private volatile boolean isConnectionException;
@@ -749,16 +747,20 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
+    droppedPipeTaskKeys.add(committerKey);
 
     if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(committerKey);
     }
     retryEventQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && isDroppedPipe(
-                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+              && isDroppedPipe((EnrichedEvent) event, committerKey)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -769,8 +771,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     retryTsFileQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && isDroppedPipe(
-                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+              && isDroppedPipe((EnrichedEvent) event, committerKey)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -872,18 +873,15 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   }
 
   private boolean isDroppedPipe(final EnrichedEvent event) {
-    return droppedPipeTaskKeys.contains(
-        new Triple<>(event.getPipeName(), event.getCreationTime(), 
event.getRegionId()));
-  }
-
-  private static boolean isDroppedPipe(
-      final EnrichedEvent event,
-      final String pipeNameToDrop,
-      final long creationTimeToDrop,
-      final int regionId) {
-    return pipeNameToDrop.equals(event.getPipeName())
-        && creationTimeToDrop == event.getCreationTime()
-        && regionId == event.getRegionId();
+    return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, 
key));
+  }
+
+  private static boolean isDroppedPipe(final EnrichedEvent event, final 
CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0
+            || committerKey.equals(event.getCommitterKey()));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 5e6297d8438..d9e25f5e09f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
 import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
@@ -604,8 +605,13 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  @Override
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
     if (Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(committerKey);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index bbb4cb9a3a8..3702da3501a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
 import org.apache.iotdb.commons.external.collections4.BidiMap;
 import org.apache.iotdb.commons.external.collections4.bidimap.DualTreeBidiMap;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -60,9 +60,7 @@ public class WebSocketConnectorServer extends WebSocketServer 
{
   private final ConcurrentHashMap<String, ConcurrentHashMap<Long, 
EventWaitingForAck>>
       eventsWaitingForAck = new ConcurrentHashMap<>();
 
-  // Pipe name, creation time, region id
-  private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
-      ConcurrentHashMap.newKeySet();
+  private final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   private final BidiMap<String, WebSocket> router =
       new DualTreeBidiMap<String, WebSocket>(null, 
Comparator.comparing(Object::hashCode)) {};
@@ -118,33 +116,33 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           .forEach((eventId, eventWrapper) -> 
discardEvent(eventWrapper.event));
     }
 
-    droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName));
+    droppedPipeTaskKeys.removeIf(key -> key.getPipeName().equals(pipeName));
   }
 
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
+    droppedPipeTaskKeys.add(committerKey);
 
     final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
-        eventsWaitingForTransfer.get(pipeNameToDrop);
+        eventsWaitingForTransfer.get(committerKey.getPipeName());
     if (eventTransferQueue != null) {
       eventTransferQueue.removeIf(
-          eventWrapper ->
-              discardIfMatches(eventWrapper.event, pipeNameToDrop, 
creationTimeToDrop, regionId));
+          eventWrapper -> discardIfMatches(eventWrapper.event, committerKey));
       synchronized (eventTransferQueue) {
         eventTransferQueue.notifyAll();
       }
     }
 
     final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
-        eventsWaitingForAck.get(pipeNameToDrop);
+        eventsWaitingForAck.get(committerKey.getPipeName());
     if (eventId2EventMap != null) {
       eventId2EventMap
           .entrySet()
-          .removeIf(
-              entry ->
-                  discardIfMatches(
-                      entry.getValue().event, pipeNameToDrop, 
creationTimeToDrop, regionId));
+          .removeIf(entry -> discardIfMatches(entry.getValue().event, 
committerKey));
     }
   }
 
@@ -506,19 +504,13 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     }
   }
 
-  private boolean discardIfMatches(
-      final Event event,
-      final String pipeNameToDrop,
-      final long creationTimeToDrop,
-      final int regionId) {
+  private boolean discardIfMatches(final Event event, final CommitterKey 
committerKey) {
     if (!(event instanceof EnrichedEvent)) {
       return false;
     }
 
     final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
-    if (!pipeNameToDrop.equals(enrichedEvent.getPipeName())
-        || creationTimeToDrop != enrichedEvent.getCreationTime()
-        || regionId != enrichedEvent.getRegionId()) {
+    if (!isEventFromPipe(enrichedEvent, committerKey)) {
       return false;
     }
 
@@ -528,11 +520,17 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
 
   private boolean isDroppedPipe(final Event event) {
     return event instanceof EnrichedEvent
-        && droppedPipeTaskKeys.contains(
-            new Triple<>(
-                ((EnrichedEvent) event).getPipeName(),
-                ((EnrichedEvent) event).getCreationTime(),
-                ((EnrichedEvent) event).getRegionId()));
+        && droppedPipeTaskKeys.stream()
+            .anyMatch(key -> isEventFromPipe((EnrichedEvent) event, key));
+  }
+
+  private static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0
+            || committerKey.equals(event.getCommitterKey()));
   }
 
   private boolean isQueueAvailable(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index 3c487ff1356..06eab035b4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
@@ -177,6 +178,13 @@ public class WebSocketSink implements PipeConnector, 
PipeConnectorWithEventDisca
     }
   }
 
+  @Override
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
+    if (server != null) {
+      server.discardEventsOfPipe(committerKey);
+    }
+  }
+
   public void commit(EnrichedEvent enrichedEvent) {
     Optional.ofNullable(enrichedEvent)
         .ifPresent(event -> 
event.decreaseReferenceCount(WebSocketSink.class.getName(), true));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index c56e8143ef5..9ae5f3873dd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -20,8 +20,8 @@
 package org.apache.iotdb.commons.pipe.agent.task.connection;
 
 import org.apache.iotdb.commons.i18n.PipeMessages;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -48,9 +48,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   protected final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-  // Pipe name, creation time, region id
-  protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
-      ConcurrentHashMap.newKeySet();
+  protected final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   protected BlockingPendingQueue(
       final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
@@ -139,12 +137,14 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
 
   public void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
-    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+    discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, 
regionId, -1));
+  }
+
+  public void discardEventsOfPipe(final CommitterKey committerKey) {
+    droppedPipeTaskKeys.add(committerKey);
     pendingQueue.removeIf(
         event -> {
-          if (event instanceof EnrichedEvent
-              && isEventFromPipe(
-                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
+          if (event instanceof EnrichedEvent && 
isEventFromPipe((EnrichedEvent) event, committerKey)) {
             if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
             }
@@ -192,16 +192,31 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
         && regionId == event.getRegionId();
   }
 
+  protected static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0
+            || committerKey.equals(event.getCommitterKey()));
+  }
+
   protected boolean isEventFromDroppedPipe(final E event) {
     return event instanceof EnrichedEvent
         && ((EnrichedEvent) event).getPipeName() != null
-        && isPipeDropped(
-            ((EnrichedEvent) event).getPipeName(),
-            ((EnrichedEvent) event).getCreationTime(),
-            ((EnrichedEvent) event).getRegionId());
+        && isEventFromDroppedPipe((EnrichedEvent) event);
+  }
+
+  public boolean isEventFromDroppedPipe(final EnrichedEvent event) {
+    return droppedPipeTaskKeys.stream().anyMatch(key -> isEventFromPipe(event, 
key));
   }
 
   public boolean isPipeDropped(final String pipeName, final long creationTime, 
final int regionId) {
-    return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, 
regionId));
+    return droppedPipeTaskKeys.stream()
+        .anyMatch(
+            key ->
+                key.getPipeName().equals(pipeName)
+                    && key.getCreationTime() == creationTime
+                    && key.getRegionId() == regionId);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index f2c3a73e18c..26e7ea305d5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -168,6 +168,11 @@ public class PipeEventCommitManager {
     return true;
   }
 
+  public CommitterKey getCommitterKey(
+      final String pipeName, final long creationTime, final int regionId) {
+    return generateCommitterKey(pipeName, creationTime, regionId);
+  }
+
   private CommitterKey generateCommitterKey(
       final String pipeName, final long creationTime, final int regionId) {
     return taskAgent.getCommitterKey(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
index ab4dbcf9075..4ffc0c25ed2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
@@ -19,7 +19,14 @@
 
 package org.apache.iotdb.commons.pipe.sink.protocol;
 
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+
 public interface PipeConnectorWithEventDiscard {
 
   void discardEventsOfPipe(String pipeName, long creationTime, int regionId);
+
+  default void discardEventsOfPipe(final CommitterKey committerKey) {
+    discardEventsOfPipe(
+        committerKey.getPipeName(), committerKey.getCreationTime(), 
committerKey.getRegionId());
+  }
 }


Reply via email to