This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 28c4e020bc2 [To dev/1.3] Fix the deadlock at ConfigNode
PipeTaskCoordinatorLock (#17233) (#17424)
28c4e020bc2 is described below
commit 28c4e020bc25ab4af1bd7522c1d20ac383724575
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 16:35:36 2026 +0800
[To dev/1.3] Fix the deadlock at ConfigNode PipeTaskCoordinatorLock
(#17233) (#17424)
* fix
* Update PipeTaskCoordinatorLock.java
---
.../pipe/coordinator/task/PipeTaskCoordinator.java | 14 ++------------
.../coordinator/task/PipeTaskCoordinatorLock.java | 19 ++++++++++++-------
.../manager/subscription/SubscriptionCoordinator.java | 10 ++--------
3 files changed, 16 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index f48c64c3ea2..ee1ccd2a8fb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -82,19 +82,9 @@ public class PipeTaskCoordinator {
/**
* Unlock the pipe task coordinator. Calling this method will clear the pipe
task info holder,
* which means that the holder will be null after calling this method.
- *
- * @return {@code true} if successfully unlocked, {@code false} if current
thread is not holding
- * the lock.
*/
- public boolean unlock() {
- try {
- pipeTaskCoordinatorLock.unlock();
- return true;
- } catch (IllegalMonitorStateException ignored) {
- // This is thrown if unlock() is called without lock() called first.
- LOGGER.warn("This thread is not holding the lock.");
- return false;
- }
+ public void unlock() {
+ pipeTaskCoordinatorLock.unlock();
}
public boolean isLocked() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 12b92619004..b86c556f20d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -22,24 +22,29 @@ package
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
/**
- * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task
coordinator. It is used to
+ * {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task
coordinator. It is used to
* ensure that only one thread can execute the pipe task coordinator at the
same time.
+ *
+ * <p>Uses {@link Semaphore} instead of {@link
java.util.concurrent.locks.ReentrantLock} to support
+ * cross-thread acquire/release, which is required by the procedure recovery
mechanism: locks may be
+ * acquired on the StateMachineUpdater thread during {@code restoreLock()} and
released on a
+ * ProcedureCoreWorker thread after execution.
*/
public class PipeTaskCoordinatorLock {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
- private final ReentrantLock lock = new ReentrantLock();
+ private final Semaphore semaphore = new Semaphore(1);
public void lock() {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}",
Thread.currentThread().getName());
try {
- lock.lockInterruptibly();
+ semaphore.acquire();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
} catch (final InterruptedException e) {
@@ -54,7 +59,7 @@ public class PipeTaskCoordinatorLock {
try {
LOGGER.debug(
"PipeTaskCoordinator lock waiting for thread {}",
Thread.currentThread().getName());
- if (lock.tryLock(10, TimeUnit.SECONDS)) {
+ if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
return true;
@@ -74,12 +79,12 @@ public class PipeTaskCoordinatorLock {
}
public void unlock() {
- lock.unlock();
+ semaphore.release();
LOGGER.debug(
"PipeTaskCoordinator lock released by thread {}",
Thread.currentThread().getName());
}
public boolean isLocked() {
- return lock.isLocked();
+ return semaphore.availablePermits() == 0;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 28c596de7ba..ac88f31f921 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -105,14 +105,8 @@ public class SubscriptionCoordinator {
subscriptionInfoHolder = null;
}
- try {
- coordinatorLock.unlock();
- return true;
- } catch (IllegalMonitorStateException ignored) {
- // This is thrown if unlock() is called without lock() called first.
- LOGGER.warn("This thread is not holding the lock.");
- return false;
- }
+ coordinatorLock.unlock();
+ return true;
}
public boolean isLocked() {