This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new fc6913b7509 [To dev/1.3] Pipe: Fix the stuck state caused by unfair
lock in Sink start phase (#16100) (#16106)
fc6913b7509 is described below
commit fc6913b7509b6537be6290993024537f6a01bde9
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 6 09:43:51 2025 +0800
[To dev/1.3] Pipe: Fix the stuck state caused by unfair lock in Sink start
phase (#16100) (#16106)
* Pipe: Fix the stuck state caused by unfair lock in Sink start phase
* fix
(cherry picked from commit 2df3c45cef51f90bd2a01586aacfa7cc52c6bee5)
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 7 +--
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 7 ++-
.../task/subtask/PipeAbstractSinkSubtask.java | 23 +--------
.../agent/task/subtask/PipeReportableSubtask.java | 54 +++++++++++++++-------
4 files changed, 48 insertions(+), 43 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 9d01abf24c1..acfa13c68c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -228,11 +228,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
- highPriorityLockTaskCount.incrementAndGet();
try {
- synchronized (highPriorityLockTaskCount) {
- highPriorityLockTaskCount.notifyAll();
- }
+ increaseHighPriorityTaskCount();
// synchronized to use the lastEvent & lastExceptionEvent
synchronized (this) {
@@ -271,7 +268,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
}
} finally {
- highPriorityLockTaskCount.decrementAndGet();
+ decreaseHighPriorityTaskCount();
}
if (outputPipeConnector instanceof IoTDBSink) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 80929379a10..0df3a773b9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -123,7 +123,12 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
}
if (runningTaskCount == 0) {
- executor.start(subtask.getTaskID());
+ try {
+ subtask.increaseHighPriorityTaskCount();
+ executor.start(subtask.getTaskID());
+ } finally {
+ subtask.decreaseHighPriorityTaskCount();
+ }
}
runningTaskCount++;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index fc13ccc557c..62cce7438ba 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -170,14 +170,8 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
MAX_RETRY_TIMES,
e);
try {
- synchronized (highPriorityLockTaskCount) {
- // The wait operation will release the highPriorityLockTaskCount
lock, so there will be
- // no deadlock.
- if (highPriorityLockTaskCount.get() == 0) {
- highPriorityLockTaskCount.wait(
- retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
- }
- }
+ sleepIfNoHighPriorityTask(
+ retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the
target system.",
@@ -254,17 +248,4 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
lastExceptionEvent = null;
}
}
-
- private void preScheduleLowPriorityTask(int maxRetries) {
- while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
- try {
- // Introduce a short delay to avoid CPU spinning
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Interrupted while waiting for the high priority lock
task.", e);
- break;
- }
- }
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 915a1cf66f0..8555ca85f3a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -90,14 +90,8 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
throwable.getMessage(),
throwable);
try {
- synchronized (highPriorityLockTaskCount) {
- // The wait operation will release the highPriorityLockTaskCount
lock, so there will be
- // no deadlock.
- if (highPriorityLockTaskCount.get() == 0) {
- highPriorityLockTaskCount.wait(
- retryCount.get() *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
- }
- }
+ sleepIfNoHighPriorityTask(
+ retryCount.get() *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time:
{}, simple class: {})",
@@ -164,14 +158,8 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
throwable.getMessage(),
throwable);
try {
- synchronized (highPriorityLockTaskCount) {
- // The wait operation will release the highPriorityLockTaskCount lock,
so there will be
- // no deadlock.
- if (highPriorityLockTaskCount.get() == 0) {
- highPriorityLockTaskCount.wait(
- retryCount.get() *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
- }
- }
+ sleepIfNoHighPriorityTask(
+ retryCount.get() *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {},
simple class: {})",
@@ -183,4 +171,38 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
submitSelf();
}
+
+ protected void preScheduleLowPriorityTask(int maxRetries) {
+ while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
+ try {
+ // Introduce a short delay to avoid CPU spinning
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while waiting for the high priority lock
task.", e);
+ break;
+ }
+ }
+ }
+
+ protected void sleepIfNoHighPriorityTask(long sleepMillis) throws
InterruptedException {
+ synchronized (highPriorityLockTaskCount) {
+ // The wait operation will release the highPriorityLockTaskCount lock,
so there will be
+ // no deadlock.
+ if (highPriorityLockTaskCount.get() > 0) {
+ highPriorityLockTaskCount.wait(sleepMillis);
+ }
+ }
+ }
+
+ public void increaseHighPriorityTaskCount() {
+ highPriorityLockTaskCount.incrementAndGet();
+ synchronized (highPriorityLockTaskCount) {
+ highPriorityLockTaskCount.notifyAll();
+ }
+ }
+
+ public void decreaseHighPriorityTaskCount() {
+ highPriorityLockTaskCount.decrementAndGet();
+ }
}