This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 30e9f326e60769b7e3a2fd973910d84f1c64f779
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Aug 5 16:54:05 2025 +0800

    Pipe: Fix the stuck state caused by unfair lock in Sink start phase (#16100)
    
    * Pipe: Fix the stuck state caused by unfair lock in Sink start phase
    
    * fix
    
    (cherry picked from commit 2df3c45cef51f90bd2a01586aacfa7cc52c6bee5)
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  7 +--
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |  7 ++-
 .../task/subtask/PipeAbstractSinkSubtask.java      | 23 +---------
 .../agent/task/subtask/PipeReportableSubtask.java  | 50 ++++++++++++++++------
 4 files changed, 46 insertions(+), 41 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 9d01abf24c1..acfa13c68c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -228,11 +228,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     // Try to remove the events as much as possible
     inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
 
-    highPriorityLockTaskCount.incrementAndGet();
     try {
-      synchronized (highPriorityLockTaskCount) {
-        highPriorityLockTaskCount.notifyAll();
-      }
+      increaseHighPriorityTaskCount();
 
       // synchronized to use the lastEvent & lastExceptionEvent
       synchronized (this) {
@@ -271,7 +268,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         }
       }
     } finally {
-      highPriorityLockTaskCount.decrementAndGet();
+      decreaseHighPriorityTaskCount();
     }
 
     if (outputPipeConnector instanceof IoTDBSink) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 80929379a10..0df3a773b9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -123,7 +123,12 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
     }
 
     if (runningTaskCount == 0) {
-      executor.start(subtask.getTaskID());
+      try {
+        subtask.increaseHighPriorityTaskCount();
+        executor.start(subtask.getTaskID());
+      } finally {
+        subtask.decreaseHighPriorityTaskCount();
+      }
     }
 
     runningTaskCount++;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index fc13ccc557c..62cce7438ba 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -170,14 +170,8 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
             MAX_RETRY_TIMES,
             e);
         try {
-          synchronized (highPriorityLockTaskCount) {
-            // The wait operation will release the highPriorityLockTaskCount 
lock, so there will be
-            // no deadlock.
-            if (highPriorityLockTaskCount.get() == 0) {
-              highPriorityLockTaskCount.wait(
-                  retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
-            }
-          }
+          sleepIfNoHighPriorityTask(
+              retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
         } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to handshake with the 
target system.",
@@ -254,17 +248,4 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
       lastExceptionEvent = null;
     }
   }
-
-  private void preScheduleLowPriorityTask(int maxRetries) {
-    while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
-      try {
-        // Introduce a short delay to avoid CPU spinning
-        Thread.sleep(10);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.warn("Interrupted while waiting for the high priority lock 
task.", e);
-        break;
-      }
-    }
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index d5c0882bc41..f290f8c4965 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -104,13 +104,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
           throwable.getMessage(),
           throwable);
       try {
-        synchronized (highPriorityLockTaskCount) {
-          // The wait operation will release the highPriorityLockTaskCount 
lock, so there will be
-          // no deadlock.
-          if (highPriorityLockTaskCount.get() == 0) {
-            
highPriorityLockTaskCount.wait(getSleepIntervalBasedOnThrowable(throwable));
-          }
-        }
+        sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable));
       } catch (final InterruptedException e) {
         LOGGER.warn(
             "Interrupted when retrying to execute subtask {} (creation time: 
{}, simple class: {})",
@@ -177,13 +171,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
         throwable.getMessage(),
         throwable);
     try {
-      synchronized (highPriorityLockTaskCount) {
-        // The wait operation will release the highPriorityLockTaskCount lock, 
so there will be
-        // no deadlock.
-        if (highPriorityLockTaskCount.get() == 0) {
-          
highPriorityLockTaskCount.wait(getSleepIntervalBasedOnThrowable(throwable));
-        }
-      }
+      sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable));
     } catch (final InterruptedException e) {
       LOGGER.warn(
           "Interrupted when retrying to execute subtask {} (creation time: {}, 
simple class: {})",
@@ -195,4 +183,38 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
     submitSelf();
   }
+
+  protected void preScheduleLowPriorityTask(int maxRetries) {
+    while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
+      try {
+        // Introduce a short delay to avoid CPU spinning
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("Interrupted while waiting for the high priority lock 
task.", e);
+        break;
+      }
+    }
+  }
+
+  protected void sleepIfNoHighPriorityTask(long sleepMillis) throws 
InterruptedException {
+    synchronized (highPriorityLockTaskCount) {
+      // The wait operation will release the highPriorityLockTaskCount lock, 
so there will be
+      // no deadlock.
+      if (highPriorityLockTaskCount.get() > 0) {
+        highPriorityLockTaskCount.wait(sleepMillis);
+      }
+    }
+  }
+
+  public void increaseHighPriorityTaskCount() {
+    highPriorityLockTaskCount.incrementAndGet();
+    synchronized (highPriorityLockTaskCount) {
+      highPriorityLockTaskCount.notifyAll();
+    }
+  }
+
+  public void decreaseHighPriorityTaskCount() {
+    highPriorityLockTaskCount.decrementAndGet();
+  }
 }

Reply via email to