This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5a54947d168 Pipe: Optimize Drop Pipe high priority tasks cannot obtain 
SubTask object lock (#15404) (#15610)
5a54947d168 is described below

commit 5a54947d16898db13db56234488ac163b1c3fea9
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 29 18:28:11 2025 +0800

    Pipe: Optimize Drop Pipe high priority tasks cannot obtain SubTask object 
lock (#15404) (#15610)
    
    (cherry picked from commit ca8ce24f2f0c1703164412a1d704c1166ebbafaa)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../subtask/connector/PipeConnectorSubtask.java    |  71 ++++++------
 .../task/subtask/PipeAbstractConnectorSubtask.java | 127 +++++++++++++--------
 2 files changed, 115 insertions(+), 83 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 359376f0f6d..f316e2cf462 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -232,41 +232,46 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
     // Try to remove the events as much as possible
     inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
 
-    // synchronized to use the lastEvent & lastExceptionEvent
-    synchronized (this) {
-      // Here we discard the last event, and re-submit the pipe task to avoid 
that the pipe task has
-      // stopped submission but will not be stopped by critical exceptions, 
because when it acquires
-      // lock, the pipe is already dropped, thus it will do nothing.
-      // Note that since we use a new thread to stop all the pipes, we will 
not encounter deadlock
-      // here. Or else we will.
-      if (lastEvent instanceof EnrichedEvent
-          && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
-          && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
-        // Do not clear last event's reference count because it may be on 
transferring
-        lastEvent = null;
-        // Submit self to avoid that the lastEvent has been retried "max 
times" times and has
-        // stopped executing.
-        // 1. If the last event is still on execution, or submitted by the 
previous "onSuccess" or
-        //    "onFailure", the "submitSelf" cause nothing.
-        // 2. If the last event is waiting the instance lock to call 
"onSuccess", then the callback
-        //    method will skip this turn of submission.
-        // 3. If the last event is waiting to call "onFailure", then it will 
be ignored because the
-        //    last event has been set to null.
-        // 4. If the last event has called "onFailure" and caused the subtask 
to stop submission,
-        //    it's submitted here and the "report" will wait for the "drop 
pipe" lock to stop all
-        //    the pipes with critical exceptions. As illustrated above, the 
"report" will do
-        //    nothing.
-        submitSelf();
-      }
+    highPriorityLockTaskCount.incrementAndGet();
+    try {
+      // synchronized to use the lastEvent & lastExceptionEvent
+      synchronized (this) {
+        // Here we discard the last event, and re-submit the pipe task to 
avoid that the pipe task
+        // has stopped submission but will not be stopped by critical 
exceptions, because when it
+        // acquires lock, the pipe is already dropped, thus it will do 
nothing. Note that since we
+        // use a new thread to stop all the pipes, we will not encounter 
deadlock here. Or else we
+        // will.
+        if (lastEvent instanceof EnrichedEvent
+            && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
+            && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
+          // Do not clear the last event's reference counts because it may be 
on transferring
+          lastEvent = null;
+          // Submit self to avoid that the lastEvent has been retried "max 
times" times and has
+          // stopped executing.
+          // 1. If the last event is still on execution, or submitted by the 
previous "onSuccess" or
+          //    "onFailure", the "submitSelf" causes nothing.
+          // 2. If the last event is waiting the instance lock to call 
"onSuccess", then the
+          //    callback method will skip this turn of submission.
+          // 3. If the last event is waiting to call "onFailure", then it will 
be ignored because
+          //    the last event has been set to null.
+          // 4. If the last event has called "onFailure" and caused the 
subtask to stop submission,
+          //    it's submitted here and the "report" will wait for the "drop 
pipe" lock to stop all
+          //    the pipes with critical exceptions. As illustrated above, the 
"report" will do
+          //    nothing.
+          submitSelf();
+        }
 
-      // We only clear the lastEvent's reference count when it's already on 
failure. Namely, we
-      // clear the lastExceptionEvent. It's safe to potentially clear it twice 
because we have the
-      // "nonnull" detection.
-      if (lastExceptionEvent instanceof EnrichedEvent
-          && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())
-          && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
-        clearReferenceCountAndReleaseLastExceptionEvent();
+        // We only clear the lastEvent's reference counts when it's already on 
failure. Namely, we
+        // clear the lastExceptionEvent. It's safe to potentially clear it 
twice because we have the
+        // "nonnull" detection.
+        if (lastExceptionEvent instanceof EnrichedEvent
+            && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())
+            && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) 
{
+          clearReferenceCountAndReleaseLastExceptionEvent();
+        }
       }
+    } finally {
+      highPriorityLockTaskCount.decrementAndGet();
     }
 
     if (outputPipeConnector instanceof IoTDBConnector) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index cfd987758e4..055a546e7e3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -34,11 +34,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeAbstractConnectorSubtask.class);
 
+  // To ensure that high-priority tasks can obtain object locks first, a 
counter is now used to save
+  // the number of high-priority tasks.
+  protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
+
   // For output (transfer events to the target system in connector)
   protected PipeConnector outputPipeConnector;
 
@@ -70,67 +75,76 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
   }
 
   @Override
-  public synchronized void onSuccess(final Boolean 
hasAtLeastOneEventProcessed) {
-    isSubmitted = false;
+  public void onSuccess(final Boolean hasAtLeastOneEventProcessed) {
+    preScheduleLowPriorityTask(100);
+
+    synchronized (this) {
+      isSubmitted = false;
 
-    super.onSuccess(hasAtLeastOneEventProcessed);
+      super.onSuccess(hasAtLeastOneEventProcessed);
+    }
   }
 
   @Override
-  public synchronized void onFailure(final Throwable throwable) {
-    isSubmitted = false;
+  public void onFailure(final Throwable throwable) {
+    preScheduleLowPriorityTask(100);
 
-    if (isClosed.get()) {
-      LOGGER.info(
-          "onFailure in pipe transfer, ignored because the connector subtask 
is dropped.",
-          throwable);
-      clearReferenceCountAndReleaseLastEvent(null);
-      return;
-    }
+    synchronized (this) {
+      isSubmitted = false;
 
-    // We assume that the event is cleared as the "lastEvent" in processor 
subtask and reaches the
-    // connector subtask. Then, it may fail because of released resource and 
block the other pipes
-    // using the same connector. We simply discard it.
-    if (lastExceptionEvent instanceof EnrichedEvent
-        && ((EnrichedEvent) lastExceptionEvent).isReleased()) {
-      LOGGER.info(
-          "onFailure in pipe transfer, ignored because the failure event is 
released.", throwable);
-      submitSelf();
-      return;
-    }
+      if (isClosed.get()) {
+        LOGGER.info(
+            "onFailure in pipe transfer, ignored because the connector subtask 
is dropped.",
+            throwable);
+        clearReferenceCountAndReleaseLastEvent(null);
+        return;
+      }
 
-    // If lastExceptionEvent != lastEvent, it indicates that the lastEvent's 
reference has been
-    // changed because the pipe of it has been dropped. In that case, we just 
discard the event.
-    if (lastEvent != lastExceptionEvent) {
-      LOGGER.info(
-          "onFailure in pipe transfer, ignored because the failure event's 
pipe is dropped.",
-          throwable);
-      clearReferenceCountAndReleaseLastExceptionEvent();
-      submitSelf();
-      return;
-    }
+      // We assume that the event is cleared as the "lastEvent" in processor 
subtask and reaches the
+      // connector subtask. Then, it may fail because of released resource and 
block the other pipes
+      // using the same connector. We simply discard it.
+      if (lastExceptionEvent instanceof EnrichedEvent
+          && ((EnrichedEvent) lastExceptionEvent).isReleased()) {
+        LOGGER.info(
+            "onFailure in pipe transfer, ignored because the failure event is 
released.",
+            throwable);
+        submitSelf();
+        return;
+      }
 
-    if (throwable instanceof PipeConnectionException) {
-      // Retry to connect to the target system if the connection is broken
-      // We should reconstruct the client before re-submit the subtask
-      if (onPipeConnectionException(throwable)) {
-        // return if the pipe task should be stopped
+      // If lastExceptionEvent != lastEvent, it indicates that the lastEvent's 
reference has been
+      // changed because the pipe of it has been dropped. In that case, we 
just discard the event.
+      if (lastEvent != lastExceptionEvent) {
+        LOGGER.info(
+            "onFailure in pipe transfer, ignored because the failure event's 
pipe is dropped.",
+            throwable);
+        clearReferenceCountAndReleaseLastExceptionEvent();
+        submitSelf();
         return;
       }
-    }
 
-    // Handle exceptions if any available clients exist
-    // Notice that the PipeRuntimeConnectorCriticalException must be thrown 
here
-    // because the upper layer relies on this to stop all the related pipe 
tasks
-    // Other exceptions may cause the subtask to stop forever and can not be 
restarted
-    if (throwable instanceof PipeRuntimeConnectorCriticalException) {
-      super.onFailure(throwable);
-    } else {
-      // Print stack trace for better debugging
-      LOGGER.warn(
-          "A non PipeRuntimeConnectorCriticalException occurred, will throw a 
PipeRuntimeConnectorCriticalException.",
-          throwable);
-      super.onFailure(new 
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+      if (throwable instanceof PipeConnectionException) {
+        // Retry to connect to the target system if the connection is broken
+        // We should reconstruct the client before re-submit the subtask
+        if (onPipeConnectionException(throwable)) {
+          // return if the pipe task should be stopped
+          return;
+        }
+      }
+
+      // Handle exceptions if any available clients exist
+      // Notice that the PipeRuntimeConnectorCriticalException must be thrown 
here
+      // because the upper layer relies on this to stop all the related pipe 
tasks
+      // Other exceptions may cause the subtask to stop forever and can not be 
restarted
+      if (throwable instanceof PipeRuntimeConnectorCriticalException) {
+        super.onFailure(throwable);
+      } else {
+        // Print stack trace for better debugging
+        LOGGER.warn(
+            "A non PipeRuntimeConnectorCriticalException occurred, will throw 
a PipeRuntimeConnectorCriticalException.",
+            throwable);
+        super.onFailure(new 
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+      }
     }
   }
 
@@ -238,4 +252,17 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
       lastExceptionEvent = null;
     }
   }
+
+  private void preScheduleLowPriorityTask(int maxRetries) {
+    while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
+      try {
+        // Introduce a short delay to avoid CPU spinning
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("Interrupted while waiting for the high priority lock 
task.", e);
+        break;
+      }
+    }
+  }
 }

Reply via email to