This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 2ea083dcf6a [To dev/1.3] Pipe: Fixed the event clear logic of drop 
pipe (#17560) (#17619)
2ea083dcf6a is described below

commit 2ea083dcf6add81e9bb43ef536728c7d53e47f96
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 9 14:48:31 2026 +0800

    [To dev/1.3] Pipe: Fixed the event clear logic of drop pipe (#17560) 
(#17619)
    
    * Pipe: Fixed the event clear logic of drop pipe (#17560)
    
    * drop-1
    
    * wd
    
    * drop
    
    * fix
    
    * local
    
    * triple
    
    * by
    
    * spt
    
    * bug-fix
    
    * no-pipe-task-key
    
    * Update IoTDBDataRegionAsyncSink.java
    
    * triple
    
    * Fix
    
    * comp
    
    * comp-fix02
    
    * drop-n
---
 .../agent/task/connection/PipeEventCollector.java  |  20 ++-
 .../sink/PipeRealtimePriorityBlockingQueue.java    |  13 +-
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  13 +-
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |   5 +-
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |   2 +-
 .../evolvable/batch/PipeTabletEventBatch.java      |   7 +-
 .../batch/PipeTransferBatchReqBuilder.java         |   9 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  50 ++++++--
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |   5 +-
 .../websocket/WebSocketConnectorServer.java        | 141 +++++++++++++++++----
 .../sink/protocol/websocket/WebSocketSink.java     |  11 +-
 .../subtask/SubscriptionSinkSubtaskLifeCycle.java  |   3 +-
 .../subtask/SubscriptionSinkSubtaskManager.java    |   2 +-
 .../task/connection/PipeEventCollectorTest.java    |  86 +++++++++++++
 .../task/subtask/sink/PipeSinkSubtaskTest.java     |  61 +++++++++
 .../apache/iotdb/db/pipe/sink/PipeSinkTest.java    |  95 ++++++++++++++
 .../task/connection/BlockingPendingQueue.java      |  55 +++++++-
 .../iotdb/commons/pipe/datastructure/Triple.java   |  63 +++++++++
 .../commons/pipe/sink/protocol/IoTDBSink.java      |   5 +-
 .../protocol/PipeConnectorWithEventDiscard.java    |  25 ++++
 20 files changed, 599 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 4728f62c94f..387d4ff7ec6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -178,7 +178,8 @@ public class PipeEventCollector implements EventCollector {
 
   private void collectEvent(final Event event) {
     if (event instanceof EnrichedEvent) {
-      if (!((EnrichedEvent) 
event).increaseReferenceCount(PipeEventCollector.class.getName())) {
+      final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+      if 
(!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) {
         LOGGER.warn("PipeEventCollector: The event {} is already released, 
skipping it.", event);
         isFailedToIncreaseReferenceCount = true;
         return;
@@ -186,18 +187,25 @@ public class PipeEventCollector implements EventCollector 
{
 
       // Assign a commit id for this event in order to report progress in 
order.
       PipeEventCommitManager.getInstance()
-          .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
+          .enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, 
regionId);
 
-      // Assign a rebootTime for pipeConsensus
-      ((EnrichedEvent) 
event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
+      // Assign a rebootTime for iotConsensusV2
+      
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
+
+      if (enrichedEvent.getPipeName() != null
+          && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), 
creationTime, regionId)) {
+        enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
+        return;
+      }
     }
 
     if (event instanceof PipeHeartbeatEvent) {
       ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
     }
 
-    pendingQueue.offer(event);
-    collectInvocationCount.incrementAndGet();
+    if (pendingQueue.offer(event)) {
+      collectInvocationCount.incrementAndGet();
+    }
   }
 
   public void resetFlags() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 6d227ac31fd..f972bba0e6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -73,7 +73,9 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
 
   @Override
   public boolean offer(final Event event) {
-    checkBeforeOffer(event);
+    if (!checkBeforeOffer(event)) {
+      return false;
+    }
 
     if (event instanceof TsFileInsertionEvent) {
       tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
@@ -356,13 +358,14 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   }
 
   @Override
-  public void discardEventsOfPipe(final String pipeNameToDrop, final int 
regionId) {
-    super.discardEventsOfPipe(pipeNameToDrop, regionId);
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
     tsfileInsertEventDeque.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isEventFromPipe(
+                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             if (((EnrichedEvent) event)
                 
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 4b4794891f6..ae48fcb2778 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
+import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -199,9 +200,10 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
    * its queued events in the output pipe connector.
    */
-  public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     // Try to remove the events as much as possible
-    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
+    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
 
     try {
       increaseHighPriorityTaskCount();
@@ -215,6 +217,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // will.
         if (lastEvent instanceof EnrichedEvent
             && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
+            && creationTimeToDrop == ((EnrichedEvent) 
lastEvent).getCreationTime()
             && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
           // Do not clear the last event's reference counts because it may be 
on transferring
           lastEvent = null;
@@ -238,6 +241,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // "nonnull" detection.
         if (lastExceptionEvent instanceof EnrichedEvent
             && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())
+            && creationTimeToDrop == ((EnrichedEvent) 
lastExceptionEvent).getCreationTime()
             && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) 
{
           clearReferenceCountAndReleaseLastExceptionEvent();
         }
@@ -246,8 +250,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       decreaseHighPriorityTaskCount();
     }
 
-    if (outputPipeConnector instanceof IoTDBSink) {
-      ((IoTDBSink) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop, 
regionId);
+    if (outputPipeConnector instanceof PipeConnectorWithEventDiscard) {
+      ((PipeConnectorWithEventDiscard) outputPipeConnector)
+          .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 35f7983075d..1780f5a87ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -92,12 +92,13 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
    *     {@link PipeSinkSubtask} should never be used again
    * @throws IllegalStateException if {@link 
PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
    */
-  public synchronized boolean deregister(final String pipeNameToDeregister, 
int regionId) {
+  public synchronized boolean deregister(
+      final String pipeNameToDeregister, final long creationTimeToDeregister, 
final int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
 
-    subtask.discardEventsOfPipe(pipeNameToDeregister, regionId);
+    subtask.discardEventsOfPipe(pipeNameToDeregister, 
creationTimeToDeregister, regionId);
 
     try {
       if (registeredTaskCount > 1) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index caff425f790..d7f81c12dbc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -209,7 +209,7 @@ public class PipeSinkSubtaskManager {
     // Shall not be empty
     final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
 
-    lifeCycles.removeIf(o -> o.deregister(pipeName, regionId));
+    lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 96bddd0d672..c44e12a4bbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -154,10 +154,13 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
    * Discard all events of the given pipe. This method only clears the 
reference count of the events
    * and discard them, but do not modify other objects (such as buffers) for 
simplicity.
    */
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     events.removeIf(
         event -> {
-          if (pipeNameToDrop.equals(event.getPipeName()) && regionId == 
event.getRegionId()) {
+          if (pipeNameToDrop.equals(event.getPipeName())
+              && creationTimeToDrop == event.getCreationTime()
+              && regionId == event.getRegionId()) {
             
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             return true;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 9fa706e985f..ac5f568f1c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -195,9 +195,12 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
         && 
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
   }
 
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
-    defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId);
-    endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
+    endPointToBatch
+        .values()
+        .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId));
   }
 
   public int size() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 07381d15e28..fe3d44bedb4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.Triple;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -74,6 +75,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -121,6 +123,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
       new ConcurrentHashMap<>();
 
+  // Pipe name, creation time, region id
+  private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
+      ConcurrentHashMap.newKeySet();
+
   private boolean enableSendTsFileLimit;
   private volatile boolean isConnectionException;
 
@@ -660,8 +666,15 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   public void addFailureEventToRetryQueue(final Event event, final Exception 
e) {
     isConnectionException =
         e instanceof PipeConnectionException || 
ThriftClient.isConnectionBroken(e);
-    if (event instanceof EnrichedEvent && ((EnrichedEvent) 
event).isReleased()) {
-      return;
+    if (event instanceof EnrichedEvent) {
+      final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+      if (enrichedEvent.isReleased()) {
+        return;
+      }
+      if (isDroppedPipe(enrichedEvent)) {
+        
enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
+        return;
+      }
     }
 
     if (isClosed.get()) {
@@ -707,15 +720,18 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   //////////////////////////// Operations for close 
////////////////////////////
 
   @Override
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
-    if (isTabletBatchModeEnabled) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+
+    if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
+      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
     }
     retryEventQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isDroppedPipe(
+                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -726,8 +742,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     retryTsFileQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isDroppedPipe(
+                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -771,6 +787,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
     // clear reference count of events in retry queue after closing async 
client
     clearRetryEventsReferenceCount();
+    droppedPipeTaskKeys.clear();
 
     super.close();
   }
@@ -827,6 +844,21 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     this.transferTsFileCounter = transferTsFileCounter;
   }
 
+  private boolean isDroppedPipe(final EnrichedEvent event) {
+    return droppedPipeTaskKeys.contains(
+        new Triple<>(event.getPipeName(), event.getCreationTime(), 
event.getRegionId()));
+  }
+
+  private static boolean isDroppedPipe(
+      final EnrichedEvent event,
+      final String pipeNameToDrop,
+      final long creationTimeToDrop,
+      final int regionId) {
+    return pipeNameToDrop.equals(event.getPipeName())
+        && creationTimeToDrop == event.getCreationTime()
+        && regionId == event.getRegionId();
+  }
+
   @Override
   public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
     if (tabletBatchBuilder != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index a13f40b1b83..552b8cf1cae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -521,9 +521,10 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   }
 
   @Override
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     if (Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 6da64460495..2a8b5c8c3c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
+import org.apache.iotdb.commons.pipe.datastructure.Triple;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -39,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,6 +59,10 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
   private final ConcurrentHashMap<String, ConcurrentHashMap<Long, 
EventWaitingForAck>>
       eventsWaitingForAck = new ConcurrentHashMap<>();
 
+  // Pipe name, creation time, region id
+  private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
+      ConcurrentHashMap.newKeySet();
+
   private final BidiMap<String, WebSocket> router =
       new DualTreeBidiMap<String, WebSocket>(null, 
Comparator.comparing(Object::hashCode)) {};
 
@@ -97,13 +103,8 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           eventWrappers = new ArrayList<>(eventTransferQueue);
           eventTransferQueue.clear();
         }
-        eventWrappers.forEach(
-            (eventWrapper) -> {
-              if (eventWrapper.event instanceof EnrichedEvent) {
-                ((EnrichedEvent) eventWrapper.event)
-                    
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
-              }
-            });
+        eventWrappers.forEach(eventWrapper -> 
discardEvent(eventWrapper.event));
+        eventWrappers.clear();
         synchronized (eventTransferQueue) {
           eventTransferQueue.notifyAll();
         }
@@ -113,13 +114,36 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     if (eventsWaitingForAck.containsKey(pipeName)) {
       eventsWaitingForAck
           .remove(pipeName)
-          .forEach(
-              (eventId, eventWrapper) -> {
-                if (eventWrapper.event instanceof EnrichedEvent) {
-                  ((EnrichedEvent) eventWrapper.event)
-                      
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
-                }
-              });
+          .forEach((eventId, eventWrapper) -> 
discardEvent(eventWrapper.event));
+    }
+
+    droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName));
+  }
+
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
+
+    final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
+        eventsWaitingForTransfer.get(pipeNameToDrop);
+    if (eventTransferQueue != null) {
+      eventTransferQueue.removeIf(
+          eventWrapper ->
+              discardIfMatches(eventWrapper.event, pipeNameToDrop, 
creationTimeToDrop, regionId));
+      synchronized (eventTransferQueue) {
+        eventTransferQueue.notifyAll();
+      }
+    }
+
+    final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+        eventsWaitingForAck.get(pipeNameToDrop);
+    if (eventId2EventMap != null) {
+      eventId2EventMap
+          .entrySet()
+          .removeIf(
+              entry ->
+                  discardIfMatches(
+                      entry.getValue().event, pipeNameToDrop, 
creationTimeToDrop, regionId));
     }
   }
 
@@ -300,21 +324,24 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
   }
 
   public void addEvent(Event event, WebSocketSink connector) {
+    if (isDroppedPipe(event)) {
+      discardEvent(event);
+      return;
+    }
+
+    final String pipeName = connector.getPipeName();
     final PriorityBlockingQueue<EventWaitingForTransfer> queue =
-        eventsWaitingForTransfer.get(connector.getPipeName());
+        eventsWaitingForTransfer.get(pipeName);
 
     if (queue == null) {
       LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", 
connector, event);
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event)
-            .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), 
false);
-      }
+      discardEvent(event);
       return;
     }
 
     if (queue.size() >= 5) {
       synchronized (queue) {
-        while (queue.size() >= 5) {
+        while (queue.size() >= 5 && isQueueAvailable(pipeName, queue) && 
!isDroppedPipe(event)) {
           try {
             queue.wait();
           } catch (InterruptedException e) {
@@ -323,12 +350,22 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           }
         }
 
+        if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) {
+          discardEvent(event);
+          return;
+        }
+
         queue.put(
             new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), 
connector, event));
         return;
       }
     }
 
+    if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) {
+      discardEvent(event);
+      return;
+    }
+
     synchronized (queue) {
       queue.put(new 
EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
     }
@@ -377,6 +414,11 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       final WebSocketSink connector = element.connector;
 
       try {
+        if (isDroppedPipe(event)) {
+          discardEvent(event);
+          return;
+        }
+
         ByteBuffer tabletBuffer;
         if (event instanceof PipeRawTabletInsertionEvent) {
           tabletBuffer = ((PipeRawTabletInsertionEvent) 
event).convertToTablet().serialize();
@@ -387,7 +429,11 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
         }
 
         if (tabletBuffer == null) {
-          connector.commit((EnrichedEvent) event);
+          if (isDroppedPipe(event)) {
+            discardEvent(event);
+          } else {
+            connector.commit((EnrichedEvent) event);
+          }
           return;
         }
 
@@ -398,11 +444,17 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
 
         server.broadcast(payload, 
Collections.singletonList(router.get(pipeName)));
 
+        if (isDroppedPipe(event)) {
+          discardEvent(event);
+          return;
+        }
+
         final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
             eventsWaitingForAck.get(pipeName);
         if (eventId2EventMap == null) {
           LOGGER.warn(
               "The pipe {} was dropped so the event ack {} will be ignored.", 
pipeName, eventId);
+          discardEvent(event);
           return;
         }
         eventId2EventMap.put(eventId, new EventWaitingForAck(connector, 
event));
@@ -410,13 +462,10 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
         synchronized (server) {
           final PriorityBlockingQueue<EventWaitingForTransfer> queue =
               eventsWaitingForTransfer.get(pipeName);
-          if (queue == null) {
+          if (queue == null || isDroppedPipe(event)) {
             LOGGER.warn(
                 "The pipe {} was dropped so the event {} will be dropped.", 
pipeName, eventId);
-            if (event instanceof EnrichedEvent) {
-              ((EnrichedEvent) event)
-                  
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
-            }
+            discardEvent(event);
             return;
           }
 
@@ -465,4 +514,44 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       this.event = event;
     }
   }
+
+  private boolean discardIfMatches(
+      final Event event,
+      final String pipeNameToDrop,
+      final long creationTimeToDrop,
+      final int regionId) {
+    if (!(event instanceof EnrichedEvent)) {
+      return false;
+    }
+
+    final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+    if (!pipeNameToDrop.equals(enrichedEvent.getPipeName())
+        || creationTimeToDrop != enrichedEvent.getCreationTime()
+        || regionId != enrichedEvent.getRegionId()) {
+      return false;
+    }
+
+    discardEvent(enrichedEvent);
+    return true;
+  }
+
+  private boolean isDroppedPipe(final Event event) {
+    return event instanceof EnrichedEvent
+        && droppedPipeTaskKeys.contains(
+            new Triple<>(
+                ((EnrichedEvent) event).getPipeName(),
+                ((EnrichedEvent) event).getCreationTime(),
+                ((EnrichedEvent) event).getRegionId()));
+  }
+
+  private boolean isQueueAvailable(
+      final String pipeName, final 
PriorityBlockingQueue<EventWaitingForTransfer> queue) {
+    return eventsWaitingForTransfer.get(pipeName) == queue;
+  }
+
+  private void discardEvent(final Event event) {
+    if (event instanceof EnrichedEvent) {
+      ((EnrichedEvent) 
event).clearReferenceCount(WebSocketSink.class.getName());
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index c89486bc2c8..40fccc12c99 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Arrays;
 import java.util.Optional;
 
-public class WebSocketSink implements PipeConnector {
+public class WebSocketSink implements PipeConnector, 
PipeConnectorWithEventDiscard {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketSink.class);
 
@@ -164,6 +165,14 @@ public class WebSocketSink implements PipeConnector {
     }
   }
 
+  @Override
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    if (server != null) {
+      server.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+    }
+  }
+
   public void commit(EnrichedEvent enrichedEvent) {
     Optional.ofNullable(enrichedEvent)
         .ifPresent(event -> 
event.decreaseReferenceCount(WebSocketSink.class.getName(), true));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
index 98163697374..af871feaa7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
@@ -63,7 +63,8 @@ public class SubscriptionSinkSubtaskLifeCycle extends 
PipeSinkSubtaskLifeCycle {
   }
 
   @Override
-  public synchronized boolean deregister(final String pipeNameToDeregister, 
int regionId) {
+  public synchronized boolean deregister(
+      final String pipeNameToDeregister, final long creationTimeToDeregister, 
final int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 07def3ff4d3..f4547673eaa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -167,7 +167,7 @@ public class SubscriptionSinkSubtaskManager {
 
     final PipeSinkSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
-    if (lifeCycle.deregister(pipeName, regionId)) {
+    if (lifeCycle.deregister(pipeName, creationTime, regionId)) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
new file mode 100644
index 00000000000..d54db821007
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task.connection;
+
+import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import 
org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeRealtimePriorityBlockingQueue;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeEventCollectorTest {
+
+  @Test
+  public void 
testCollectorDoesNotOfferEventsOfDroppedPipeToUnboundedPendingQueue() {
+    verifyCollectorDoesNotOfferEventsOfDroppedPipe(
+        new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter()));
+  }
+
+  @Test
+  public void 
testCollectorDoesNotOfferEventsOfDroppedPipeToRealtimePendingQueue() {
+    verifyCollectorDoesNotOfferEventsOfDroppedPipe(new 
PipeRealtimePriorityBlockingQueue());
+  }
+
+  private void verifyCollectorDoesNotOfferEventsOfDroppedPipe(
+      final UnboundedBlockingPendingQueue<Event> pendingQueue) {
+    pendingQueue.discardEventsOfPipe("pipe", 1L, 1);
+
+    final PipeEventCollector droppedPipeCollector =
+        new PipeEventCollector(pendingQueue, 1L, 1, false, false);
+    final PipeRawTabletInsertionEvent droppedPipeEvent =
+        createPipeRawTabletInsertionEvent("pipe", 1L);
+    droppedPipeCollector.collect(droppedPipeEvent);
+
+    Assert.assertTrue(droppedPipeEvent.isReleased());
+    Assert.assertEquals(0, pendingQueue.size());
+
+    final PipeEventCollector recreatedPipeCollector =
+        new PipeEventCollector(pendingQueue, 2L, 1, false, false);
+    final PipeRawTabletInsertionEvent recreatedPipeEvent =
+        createPipeRawTabletInsertionEvent("pipe", 2L);
+    recreatedPipeCollector.collect(recreatedPipeEvent);
+
+    Assert.assertFalse(recreatedPipeEvent.isReleased());
+    Assert.assertEquals(1, pendingQueue.size());
+
+    pendingQueue.discardAllEvents();
+    Assert.assertTrue(recreatedPipeEvent.isReleased());
+  }
+
+  private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+      final String pipeName, final long creationTime) {
+    final List<MeasurementSchema> schemaList =
+        Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+    final Tablet tablet = new Tablet("root.db.d1", schemaList, 1);
+    tablet.addTimestamp(0, 1L);
+    tablet.addValue("s1", 0, 1L);
+    return new PipeRawTabletInsertionEvent(
+        tablet, false, pipeName, creationTime, null, null, false);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
new file mode 100644
index 00000000000..ddfc699721b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
+
+import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
+import org.apache.iotdb.pipe.api.PipeConnector;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.withSettings;
+
+public class PipeSinkSubtaskTest {
+
+  @Test
+  public void testDiscardEventsOfPipeDelegatesToConnector() {
+    final PipeConnector connector =
+        mock(
+            PipeConnector.class,
+            
withSettings().extraInterfaces(PipeConnectorWithEventDiscard.class));
+    final UnboundedBlockingPendingQueue<?> pendingQueue = 
mock(UnboundedBlockingPendingQueue.class);
+
+    final PipeSinkSubtask subtask =
+        Mockito.spy(
+            new PipeSinkSubtask(
+                "PipeSinkSubtaskTest",
+                System.currentTimeMillis(),
+                "data_test",
+                0,
+                (UnboundedBlockingPendingQueue) pendingQueue,
+                connector));
+
+    try {
+      subtask.discardEventsOfPipe("pipe", 1L, 1);
+
+      verify((PipeConnectorWithEventDiscard) 
connector).discardEventsOfPipe("pipe", 1L, 1);
+    } finally {
+      subtask.close();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index fae64308762..537ae6648b2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
@@ -28,14 +29,18 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
+import 
org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer;
+import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.security.SecureRandom;
 import java.util.Arrays;
@@ -103,6 +108,85 @@ public class PipeSinkTest {
     }
   }
 
+  @Test
+  public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws 
Exception {
+    try (final IoTDBDataRegionAsyncSink connector = new 
IoTDBDataRegionAsyncSink()) {
+      final PipeParameters parameters =
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(
+                      PipeSinkConstant.CONNECTOR_KEY,
+                      
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+                  put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6668");
+                }
+              });
+      connector.validate(new PipeParameterValidator(parameters));
+      connector.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+
+      final PipeRawTabletInsertionEvent droppedEvent =
+          createPipeRawTabletInsertionEvent("pipe", 1L, 1);
+      droppedEvent.increaseReferenceCount("test");
+      droppedEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 1L, 1, 
-1), 1L);
+
+      connector.discardEventsOfPipe("pipe", 1L, 1);
+      connector.addFailureEventToRetryQueue(droppedEvent, new 
PipeException("test"));
+
+      Assert.assertEquals(0, connector.getRetryEventQueueSize());
+      Assert.assertTrue(droppedEvent.isReleased());
+
+      final PipeRawTabletInsertionEvent recreatedPipeEvent =
+          createPipeRawTabletInsertionEvent("pipe", 2L, 1);
+      recreatedPipeEvent.increaseReferenceCount("test");
+      recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 
2L, 1, -1), 1L);
+
+      connector.addFailureEventToRetryQueue(recreatedPipeEvent, new 
PipeException("test"));
+
+      Assert.assertEquals(1, connector.getRetryEventQueueSize());
+    }
+  }
+
+  @Test
+  public void testWebSocketSinkDropDoesNotRequeueDroppedPipeEvents() {
+    final String pipeName = "pipe_" + System.nanoTime();
+    final WebSocketConnectorServer server = 
WebSocketConnectorServer.getOrCreateInstance(0);
+    final WebSocketSink connector = Mockito.mock(WebSocketSink.class);
+    Mockito.when(connector.getPipeName()).thenReturn(pipeName);
+
+    server.register(connector);
+    try {
+      final PipeRawTabletInsertionEvent droppedEvent =
+          createPipeRawTabletInsertionEvent(pipeName, 1L, 1);
+      droppedEvent.increaseReferenceCount(WebSocketSink.class.getName());
+      droppedEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 1L, 
1, -1), 1L);
+      server.addEvent(droppedEvent, connector);
+
+      server.discardEventsOfPipe(pipeName, 1L, 1);
+      Assert.assertTrue(droppedEvent.isReleased());
+
+      final PipeRawTabletInsertionEvent recreatedDroppedPipeEvent =
+          createPipeRawTabletInsertionEvent(pipeName, 1L, 1);
+      
recreatedDroppedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName());
+      recreatedDroppedPipeEvent.setCommitterKeyAndCommitId(
+          new CommitterKey(pipeName, 1L, 1, -1), 2L);
+      server.addEvent(recreatedDroppedPipeEvent, connector);
+
+      Assert.assertTrue(recreatedDroppedPipeEvent.isReleased());
+
+      final PipeRawTabletInsertionEvent recreatedPipeEvent =
+          createPipeRawTabletInsertionEvent(pipeName, 2L, 1);
+      recreatedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName());
+      recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 
2L, 1, -1), 3L);
+      server.addEvent(recreatedPipeEvent, connector);
+
+      Assert.assertFalse(recreatedPipeEvent.isReleased());
+    } finally {
+      server.unregister(connector);
+    }
+  }
+
   @Test
   public void testOpcUaSink() {
     final List<MeasurementSchema> schemaList =
@@ -181,4 +265,15 @@ public class PipeSinkTest {
       Assert.fail();
     }
   }
+
+  private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+      final String pipeName, final long creationTime, final int regionId) {
+    final List<MeasurementSchema> schemaList =
+        Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+    final Tablet tablet = new Tablet("root.db.d" + regionId, schemaList, 1);
+    tablet.addTimestamp(0, 1L);
+    tablet.addValue("s1", 0, 1L);
+    return new PipeRawTabletInsertionEvent(
+        tablet, false, pipeName, creationTime, null, null, false);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 8773b03f9f3..8d920121363 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.agent.task.connection;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.Triple;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -27,7 +28,9 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -44,6 +47,10 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   protected final AtomicBoolean isClosed = new AtomicBoolean(false);
 
+  // Pipe name, creation time, region id
+  protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
+      ConcurrentHashMap.newKeySet();
+
   protected BlockingPendingQueue(
       final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
     this.pendingQueue = pendingQueue;
@@ -51,7 +58,10 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean offer(final E event) {
-    checkBeforeOffer(event);
+    if (!checkBeforeOffer(event)) {
+      return false;
+    }
+
     final boolean offered = pendingQueue.offer(event);
     if (offered) {
       eventCounter.increaseEventCount(event);
@@ -60,7 +70,9 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean put(final E event) {
-    checkBeforeOffer(event);
+    if (!checkBeforeOffer(event)) {
+      return false;
+    }
     try {
       pendingQueue.put(event);
       eventCounter.increaseEventCount(event);
@@ -101,6 +113,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
     isClosed.set(true);
     pendingQueue.clear();
     eventCounter.reset();
+    droppedPipeTaskKeys.clear();
   }
 
   /** DO NOT FORGET to set eventCounter to new value after invoking this 
method. */
@@ -120,14 +133,17 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
           return true;
         });
     eventCounter.reset();
+    droppedPipeTaskKeys.clear();
   }
 
-  public void discardEventsOfPipe(final String pipeNameToDrop, final int 
regionId) {
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
     pendingQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isEventFromPipe(
+                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
             }
@@ -157,9 +173,34 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     return eventCounter.getPipeHeartbeatEventCount();
   }
 
-  protected void checkBeforeOffer(final E event) {
-    if (isClosed.get() && event instanceof EnrichedEvent) {
+  protected boolean checkBeforeOffer(final E event) {
+    final boolean shouldReject = isClosed.get() || 
isEventFromDroppedPipe(event);
+    if (shouldReject && event instanceof EnrichedEvent) {
       ((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName());
     }
+    return !shouldReject;
+  }
+
+  protected static boolean isEventFromPipe(
+      final EnrichedEvent event,
+      final String pipeNameToDrop,
+      final long creationTimeToDrop,
+      final int regionId) {
+    return pipeNameToDrop.equals(event.getPipeName())
+        && creationTimeToDrop == event.getCreationTime()
+        && regionId == event.getRegionId();
+  }
+
+  protected boolean isEventFromDroppedPipe(final E event) {
+    return event instanceof EnrichedEvent
+        && ((EnrichedEvent) event).getPipeName() != null
+        && isPipeDropped(
+            ((EnrichedEvent) event).getPipeName(),
+            ((EnrichedEvent) event).getCreationTime(),
+            ((EnrichedEvent) event).getRegionId());
+  }
+
+  public boolean isPipeDropped(final String pipeName, final long creationTime, 
final int regionId) {
+    return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, 
regionId));
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java
new file mode 100644
index 00000000000..275ccb20ea3
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure;
+
+import java.util.Objects;
+
+public class Triple<L, M, R> {
+  public final L first;
+  public final M second;
+  public final R third;
+
+  public Triple(final L first, final M second, final R third) {
+    this.first = first;
+    this.second = second;
+    this.third = third;
+  }
+
+  public L getFirst() {
+    return first;
+  }
+
+  public M getSecond() {
+    return second;
+  }
+
+  public R getThird() {
+    return third;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final Triple<?, ?, ?> triple = (Triple<?, ?, ?>) o;
+    return first.equals(triple.first) && second.equals(triple.second) && 
third.equals(triple.third);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(first, second, third);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 66f3eda6a1a..2f2d54e7b4f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -139,7 +139,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_RATE_LIMIT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_SKIP_IF_KEY;
 
-public abstract class IoTDBSink implements PipeConnector {
+public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEventDiscard {
 
   private static final String PARSE_URL_ERROR_FORMATTER =
       "Exception occurred while parsing node urls from target servers: {}";
@@ -621,7 +621,8 @@ public abstract class IoTDBSink implements PipeConnector {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. We need to discard
    * its batched or queued events in the output pipe connector.
    */
-  public synchronized void discardEventsOfPipe(final String pipeName, final 
int regionId) {
+  public synchronized void discardEventsOfPipe(
+      final String pipeName, final long creationTime, final int regionId) {
     // Do nothing by default
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
new file mode 100644
index 00000000000..ab4dbcf9075
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.protocol;
+
+public interface PipeConnectorWithEventDiscard {
+
+  void discardEventsOfPipe(String pipeName, long creationTime, int regionId);
+}

Reply via email to