This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b992f29004 Pipe: Fixed some problems cause by reusing connectors 
after exceptions occurred in connector stage (#12685)
1b992f29004 is described below

commit 1b992f29004390991e3df0c2c3390b2970f2bc8b
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 22:52:04 2024 +0800

    Pipe: Fixed some problems cause by reusing connectors after exceptions 
occurred in connector stage (#12685)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/execution/PipeConfigNodeSubtask.java      |  2 +
 .../airgap/IoTDBDataRegionAirGapConnector.java     |  3 +-
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |  3 +-
 .../async/IoTDBDataRegionAsyncConnector.java       | 10 +++-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  3 +-
 .../protocol/writeback/WriteBackConnector.java     |  3 +-
 .../subtask/connector/PipeConnectorSubtask.java    | 60 ++++++++++++++++++++--
 .../PipeRealtimePriorityBlockingQueue.java         | 11 +++-
 .../pipe/task/connection/BlockingPendingQueue.java | 22 +++++---
 .../task/subtask/PipeAbstractConnectorSubtask.java | 43 +++++++++++++++-
 10 files changed, 138 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 220d0388be2..02cb5373359 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -186,6 +186,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
 
       PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
     } catch (final PipeException e) {
+      setLastExceptionEvent(event);
       if (!isClosed.get()) {
         throw e;
       } else {
@@ -196,6 +197,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
         clearReferenceCountAndReleaseLastEvent();
       }
     } catch (final Exception e) {
+      setLastExceptionEvent(event);
       if (!isClosed.get()) {
         throw new PipeException(
             String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 22377cc9a6d..9874741c7f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -129,7 +130,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
     try {
       if (event instanceof PipeSchemaRegionWritePlanEvent) {
         doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
-      } else if (!(event instanceof PipeHeartbeatEvent)) {
+      } else if (!(event instanceof PipeHeartbeatEvent || event instanceof 
PipeTerminateEvent)) {
         LOGGER.warn(
             "IoTDBDataRegionAirGapConnector does not support transferring 
generic event: {}.",
             event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index e964ddffd26..5edf485adfb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -303,7 +304,7 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
   @Override
   public void transfer(final Event event) throws Exception {
-    if (!(event instanceof PipeHeartbeatEvent)) {
+    if (!(event instanceof PipeHeartbeatEvent || event instanceof 
PipeTerminateEvent)) {
       LOGGER.warn(
           "IoTDBLegacyPipeConnector does not support transferring generic 
event: {}.", event);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index b5b4397fd60..7e3b4fb8a9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -370,8 +371,9 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     transferQueuedEventsIfNecessary();
     transferBatchedEventsIfNecessary();
 
-    if (!(event instanceof PipeHeartbeatEvent)
-        && !(event instanceof PipeSchemaRegionWritePlanEvent)) {
+    if (!(event instanceof PipeHeartbeatEvent
+        || event instanceof PipeSchemaRegionWritePlanEvent
+        || event instanceof PipeTerminateEvent)) {
       LOGGER.warn(
           "IoTDBThriftAsyncConnector does not support transferring generic 
event: {}.", event);
       return;
@@ -470,6 +472,10 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    * @param event {@link Event} to retry
    */
   public void addFailureEventToRetryQueue(final Event event) {
+    if (event instanceof EnrichedEvent && ((EnrichedEvent) 
event).isReleased()) {
+      return;
+    }
+
     if (isClosed.get()) {
       if (event instanceof EnrichedEvent) {
         ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 3b664ac50c1..fca40f21d64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -172,7 +173,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
       doTransferWrapper();
     }
 
-    if (!(event instanceof PipeHeartbeatEvent)) {
+    if (!(event instanceof PipeHeartbeatEvent || event instanceof 
PipeTerminateEvent)) {
       LOGGER.warn(
           "IoTDBThriftSyncConnector does not support transferring generic 
event: {}.", event);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 8a575922fa7..68789bd3ef5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -102,7 +103,7 @@ public class WriteBackConnector implements PipeConnector {
 
   @Override
   public void transfer(final Event event) throws Exception {
-    if (!(event instanceof PipeHeartbeatEvent)) {
+    if (!(event instanceof PipeHeartbeatEvent || event instanceof 
PipeTerminateEvent)) {
       LOGGER.warn("WriteBackConnector does not support transferring generic 
event: {}.", event);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 2994f41a3f9..cc78ebdea54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -135,27 +135,30 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       decreaseReferenceCountAndReleaseLastEvent(true);
     } catch (final PipeException e) {
       if (!isClosed.get()) {
+        setLastExceptionEvent(event);
         throw e;
       } else {
         LOGGER.info(
-            "{} in pipe transfer, ignored because pipe is dropped.",
+            "{} in pipe transfer, ignored because the connector subtask is 
dropped.",
             e.getClass().getSimpleName(),
             e);
         clearReferenceCountAndReleaseLastEvent();
       }
     } catch (final Exception e) {
       if (!isClosed.get()) {
+        setLastExceptionEvent(event);
         throw new PipeException(
             String.format(
                 "Exception in pipe transfer, subtask: %s, last event: %s, root 
cause: %s",
                 taskID,
-                lastEvent instanceof EnrichedEvent
-                    ? ((EnrichedEvent) lastEvent).coreReportMessage()
-                    : lastEvent,
+                event instanceof EnrichedEvent
+                    ? ((EnrichedEvent) event).coreReportMessage()
+                    : event,
                 ErrorHandlingUtils.getRootCause(e).getMessage()),
             e);
       } else {
-        LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.", e);
+        LOGGER.info(
+            "Exception in pipe transfer, ignored because the connector subtask 
is dropped.", e);
         clearReferenceCountAndReleaseLastEvent();
       }
     }
@@ -218,6 +221,53 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
    * its queued events in the output pipe connector.
    */
   public void discardEventsOfPipe(final String pipeNameToDrop) {
+    // Try to remove the events as much as possible
+    inputPendingQueue.removeIf(
+        event -> {
+          if (event instanceof EnrichedEvent
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+            ((EnrichedEvent) event)
+                
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+            return true;
+          }
+          return false;
+        });
+
+    // synchronized to use the lastEvent and lastExceptionEvent
+    synchronized (this) {
+      // Here we discard the last event, and re-submit the pipe task to avoid 
that the pipe task has
+      // stopped submission but will not be stopped by critical exceptions, 
because when it acquires
+      // lock, the pipe is already dropped, thus it will do nothing.
+      // Note that since we use a new thread to stop all the pipes, we will 
not encounter deadlock
+      // here. Or else we will.
+      if (lastEvent instanceof EnrichedEvent
+          && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())) 
{
+        // Do not clear last event's reference count because it may be on 
transferring
+        lastEvent = null;
+        // Submit self to avoid that the lastEvent has been retried "max 
times" times and has
+        // stopped executing.
+        // 1. If the last event is still on execution, or submitted by the 
previous "onSuccess" or
+        //    "onFailure", the "submitSelf" cause nothing.
+        // 2. If the last event is waiting the instance lock to call 
"onSuccess", then the callback
+        //    method will skip this turn of submission.
+        // 3. If the last event is waiting to call "onFailure", then it will 
be ignored because the
+        //    last event has been set to null.
+        // 4. If the last event has called "onFailure" and caused the subtask 
to stop submission,
+        //    it's submitted here and the "report" will wait for the "drop 
pipe" lock to stop all
+        //    the pipes with critical exceptions. As illustrated above, the 
"report" will do
+        //    nothing.
+        submitSelf();
+      }
+
+      // We only clear the lastEvent's reference count when it's already on 
failure. Namely, we
+      // clear the lastExceptionEvent. It's safe to potentially clear it twice 
because we have the
+      // "nonnull" detection.
+      if (lastExceptionEvent instanceof EnrichedEvent
+          && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())) {
+        clearReferenceCountAndReleaseLastExceptionEvent();
+      }
+    }
+
     if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
       ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index 82b09346322..b862907181d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQueue<Event> {
 
@@ -51,10 +52,10 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     if (event instanceof PipeHeartbeatEvent && super.peekLast() instanceof 
PipeHeartbeatEvent) {
       // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because 
they may cause OOM.
       ((EnrichedEvent) 
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
+      return false;
     } else {
-      super.directOffer(event);
+      return super.directOffer(event);
     }
-    return true;
   }
 
   @Override
@@ -119,6 +120,12 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     tsfileInsertEventDeque.forEach(action);
   }
 
+  @Override
+  public void removeIf(final Predicate<? super Event> filter) {
+    super.removeIf(filter);
+    pendingQueue.removeIf(filter);
+  }
+
   @Override
   public boolean isEmpty() {
     return super.isEmpty() && tsfileInsertEventDeque.isEmpty();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
index e33f36b07f9..04983a984d9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 public abstract class BlockingPendingQueue<E extends Event> {
 
@@ -41,12 +42,13 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
 
   private final PipeEventCounter eventCounter;
 
-  protected BlockingPendingQueue(BlockingQueue<E> pendingQueue, 
PipeEventCounter eventCounter) {
+  protected BlockingPendingQueue(
+      final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
     this.pendingQueue = pendingQueue;
     this.eventCounter = eventCounter;
   }
 
-  public boolean waitedOffer(E event) {
+  public boolean waitedOffer(final E event) {
     try {
       final boolean offered =
           pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, 
TimeUnit.MILLISECONDS);
@@ -54,14 +56,14 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
         eventCounter.increaseEventCount(event);
       }
       return offered;
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       LOGGER.info("pending queue offer is interrupted.", e);
       Thread.currentThread().interrupt();
       return false;
     }
   }
 
-  public boolean directOffer(E event) {
+  public boolean directOffer(final E event) {
     final boolean offered = pendingQueue.offer(event);
     if (offered) {
       eventCounter.increaseEventCount(event);
@@ -69,12 +71,12 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
     return offered;
   }
 
-  public boolean put(E event) {
+  public boolean put(final E event) {
     try {
       pendingQueue.put(event);
       eventCounter.increaseEventCount(event);
       return true;
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       LOGGER.info("pending queue put is interrupted.", e);
       Thread.currentThread().interrupt();
       return false;
@@ -92,7 +94,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
     try {
       event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
       eventCounter.decreaseEventCount(event);
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       LOGGER.info("pending queue poll is interrupted.", e);
       Thread.currentThread().interrupt();
     }
@@ -104,10 +106,14 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     eventCounter.reset();
   }
 
-  public void forEach(Consumer<? super E> action) {
+  public void forEach(final Consumer<? super E> action) {
     pendingQueue.forEach(action);
   }
 
+  public void removeIf(final Predicate<? super E> filter) {
+    pendingQueue.removeIf(filter);
+  }
+
   public boolean isEmpty() {
     return pendingQueue.isEmpty();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index a95badf6a94..e6a2399f7e8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 
 import com.google.common.util.concurrent.Futures;
@@ -48,6 +49,9 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
   // a subtask is submitted to only one thread at a time
   protected volatile boolean isSubmitted = false;
 
+  // For cleaning up the last event when the pipe is dropped
+  protected volatile Event lastExceptionEvent;
+
   protected PipeAbstractConnectorSubtask(
       final String taskID, final long creationTime, final PipeConnector 
outputPipeConnector) {
     super(taskID, creationTime);
@@ -76,11 +80,35 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
     isSubmitted = false;
 
     if (isClosed.get()) {
-      LOGGER.info("onFailure in pipe transfer, ignored because pipe is 
dropped.", throwable);
+      LOGGER.info(
+          "onFailure in pipe transfer, ignored because the connector subtask 
is dropped.",
+          throwable);
       clearReferenceCountAndReleaseLastEvent();
       return;
     }
 
+    // We assume that the event is cleared as the "lastEvent" in processor 
subtask and reaches the
+    // connector subtask. Then, it may fail because of released resource and 
block the other pipes
+    // using the same connector. We simply discard it.
+    if (lastExceptionEvent instanceof EnrichedEvent
+        && ((EnrichedEvent) lastExceptionEvent).isReleased()) {
+      LOGGER.info(
+          "onFailure in pipe transfer, ignored because the failure event is 
released.", throwable);
+      submitSelf();
+      return;
+    }
+
+    // If lastExceptionEvent != lastEvent, it indicates that the lastEvent's 
reference has been
+    // changed because the pipe of it has been dropped. In that case, we just 
discard the event.
+    if (lastEvent != lastExceptionEvent) {
+      LOGGER.info(
+          "onFailure in pipe transfer, ignored because the failure event's 
pipe is dropped.",
+          throwable);
+      clearReferenceCountAndReleaseLastExceptionEvent();
+      submitSelf();
+      return;
+    }
+
     if (throwable instanceof PipeConnectionException) {
       // Retry to connect to the target system if the connection is broken
       // We should reconstruct the client before re-submit the subtask
@@ -191,4 +219,17 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
     Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
     isSubmitted = true;
   }
+
+  protected synchronized void setLastExceptionEvent(final Event event) {
+    lastExceptionEvent = event;
+  }
+
+  protected synchronized void 
clearReferenceCountAndReleaseLastExceptionEvent() {
+    if (lastExceptionEvent != null) {
+      if (lastExceptionEvent instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
lastExceptionEvent).clearReferenceCount(PipeSubtask.class.getName());
+      }
+      lastExceptionEvent = null;
+    }
+  }
 }

Reply via email to