This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 30e9f326e60769b7e3a2fd973910d84f1c64f779 Author: Zhenyu Luo <[email protected]> AuthorDate: Tue Aug 5 16:54:05 2025 +0800 Pipe: Fix the stuck state caused by unfair lock in Sink start phase (#16100) * Pipe: Fix the stuck state caused by unfair lock in Sink start phase * fix (cherry picked from commit 2df3c45cef51f90bd2a01586aacfa7cc52c6bee5) --- .../agent/task/subtask/sink/PipeSinkSubtask.java | 7 +-- .../subtask/sink/PipeSinkSubtaskLifeCycle.java | 7 ++- .../task/subtask/PipeAbstractSinkSubtask.java | 23 +--------- .../agent/task/subtask/PipeReportableSubtask.java | 50 ++++++++++++++++------ 4 files changed, 46 insertions(+), 41 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 9d01abf24c1..acfa13c68c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -228,11 +228,8 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // Try to remove the events as much as possible inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId); - highPriorityLockTaskCount.incrementAndGet(); try { - synchronized (highPriorityLockTaskCount) { - highPriorityLockTaskCount.notifyAll(); - } + increaseHighPriorityTaskCount(); // synchronized to use the lastEvent & lastExceptionEvent synchronized (this) { @@ -271,7 +268,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { } } } finally { - highPriorityLockTaskCount.decrementAndGet(); + decreaseHighPriorityTaskCount(); } if (outputPipeConnector instanceof IoTDBSink) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 80929379a10..0df3a773b9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -123,7 +123,12 @@ public class PipeSinkSubtaskLifeCycle implements AutoCloseable { } if (runningTaskCount == 0) { - executor.start(subtask.getTaskID()); + try { + subtask.increaseHighPriorityTaskCount(); + executor.start(subtask.getTaskID()); + } finally { + subtask.decreaseHighPriorityTaskCount(); + } } runningTaskCount++; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index fc13ccc557c..62cce7438ba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -170,14 +170,8 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { MAX_RETRY_TIMES, e); try { - synchronized (highPriorityLockTaskCount) { - // The wait operation will release the highPriorityLockTaskCount lock, so there will be - // no deadlock. - if (highPriorityLockTaskCount.get() == 0) { - highPriorityLockTaskCount.wait( - retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()); - } - } + sleepIfNoHighPriorityTask( + retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()); } catch (final InterruptedException interruptedException) { LOGGER.info( "Interrupted while sleeping, will retry to handshake with the target system.", @@ -254,17 +248,4 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { lastExceptionEvent = null; } } - - private void preScheduleLowPriorityTask(int maxRetries) { - while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) { - try { - // Introduce a short delay to avoid CPU spinning - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn("Interrupted while waiting for the high priority lock task.", e); - break; - } - } - } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java index d5c0882bc41..f290f8c4965 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java @@ -104,13 +104,7 @@ public abstract class PipeReportableSubtask extends PipeSubtask { throwable.getMessage(), throwable); try { - synchronized (highPriorityLockTaskCount) { - // The wait operation will release the highPriorityLockTaskCount lock, so there will be - // no deadlock. - if (highPriorityLockTaskCount.get() == 0) { - highPriorityLockTaskCount.wait(getSleepIntervalBasedOnThrowable(throwable)); - } - } + sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable)); } catch (final InterruptedException e) { LOGGER.warn( "Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})", @@ -177,13 +171,7 @@ public abstract class PipeReportableSubtask extends PipeSubtask { throwable.getMessage(), throwable); try { - synchronized (highPriorityLockTaskCount) { - // The wait operation will release the highPriorityLockTaskCount lock, so there will be - // no deadlock. - if (highPriorityLockTaskCount.get() == 0) { - highPriorityLockTaskCount.wait(getSleepIntervalBasedOnThrowable(throwable)); - } - } + sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable)); } catch (final InterruptedException e) { LOGGER.warn( "Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})", @@ -195,4 +183,38 @@ public abstract class PipeReportableSubtask extends PipeSubtask { submitSelf(); } + + protected void preScheduleLowPriorityTask(int maxRetries) { + while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) { + try { + // Introduce a short delay to avoid CPU spinning + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted while waiting for the high priority lock task.", e); + break; + } + } + } + + protected void sleepIfNoHighPriorityTask(long sleepMillis) throws InterruptedException { + synchronized (highPriorityLockTaskCount) { + // The wait operation will release the highPriorityLockTaskCount lock, so there will be + // no deadlock. + if (highPriorityLockTaskCount.get() > 0) { + highPriorityLockTaskCount.wait(sleepMillis); + } + } + } + + public void increaseHighPriorityTaskCount() { + highPriorityLockTaskCount.incrementAndGet(); + synchronized (highPriorityLockTaskCount) { + highPriorityLockTaskCount.notifyAll(); + } + } + + public void decreaseHighPriorityTaskCount() { + highPriorityLockTaskCount.decrementAndGet(); + } }
