This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch ref-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ref-13 by this push:
new dd4cf68e619 Pipe: Refactored the PipeTaskCoordinatorLock (#16988)
dd4cf68e619 is described below
commit dd4cf68e6194033d2d454cab26cce1db14bdf735
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 7 17:55:29 2026 +0800
Pipe: Refactored the PipeTaskCoordinatorLock (#16988)
* may-fix
* refactor
* commit-sug
---
.../pipe/coordinator/task/PipeTaskCoordinator.java | 16 +------
.../coordinator/task/PipeTaskCoordinatorLock.java | 52 +++++++---------------
2 files changed, 17 insertions(+), 51 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 100da334d35..f48c64c3ea2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -51,7 +51,6 @@ public class PipeTaskCoordinator {
private final PipeTaskInfo pipeTaskInfo;
private final PipeTaskCoordinatorLock pipeTaskCoordinatorLock;
- private AtomicReference<PipeTaskInfo> pipeTaskInfoHolder;
public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo
pipeTaskInfo) {
this.configManager = configManager;
@@ -66,12 +65,7 @@ public class PipeTaskCoordinator {
* null if the lock is not acquired.
*/
public AtomicReference<PipeTaskInfo> tryLock() {
- if (pipeTaskCoordinatorLock.tryLock()) {
- pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return pipeTaskInfoHolder;
- }
-
- return null;
+ return pipeTaskCoordinatorLock.tryLock() ? new
AtomicReference<>(pipeTaskInfo) : null;
}
/**
@@ -82,8 +76,7 @@ public class PipeTaskCoordinator {
*/
public AtomicReference<PipeTaskInfo> lock() {
pipeTaskCoordinatorLock.lock();
- pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return pipeTaskInfoHolder;
+ return new AtomicReference<>(pipeTaskInfo);
}
/**
@@ -94,11 +87,6 @@ public class PipeTaskCoordinator {
* the lock.
*/
public boolean unlock() {
- if (pipeTaskInfoHolder != null) {
- pipeTaskInfoHolder.set(null);
- pipeTaskInfoHolder = null;
- }
-
try {
pipeTaskCoordinatorLock.unlock();
return true;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index e57add9a001..12b92619004 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -22,10 +22,8 @@ package
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
/**
* {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task
coordinator. It is used to
@@ -35,22 +33,16 @@ public class PipeTaskCoordinatorLock {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
- private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
- private final AtomicLong idGenerator = new AtomicLong(0);
+ private final ReentrantLock lock = new ReentrantLock();
public void lock() {
+ LOGGER.debug(
+ "PipeTaskCoordinator lock waiting for thread {}",
Thread.currentThread().getName());
try {
- final long id = idGenerator.incrementAndGet();
+ lock.lockInterruptibly();
LOGGER.debug(
- "PipeTaskCoordinator lock (id: {}) waiting for thread {}",
- id,
- Thread.currentThread().getName());
- deque.put(id);
- LOGGER.debug(
- "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
- id,
- Thread.currentThread().getName());
- } catch (InterruptedException e) {
+ "PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current
thread: {}",
@@ -60,21 +52,15 @@ public class PipeTaskCoordinatorLock {
public boolean tryLock() {
try {
- final long id = idGenerator.incrementAndGet();
LOGGER.debug(
- "PipeTaskCoordinator lock (id: {}) waiting for thread {}",
- id,
- Thread.currentThread().getName());
- if (deque.offer(id, 10, TimeUnit.SECONDS)) {
+ "PipeTaskCoordinator lock waiting for thread {}",
Thread.currentThread().getName());
+ if (lock.tryLock(10, TimeUnit.SECONDS)) {
LOGGER.debug(
- "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
- id,
- Thread.currentThread().getName());
+ "PipeTaskCoordinator lock acquired by thread {}",
Thread.currentThread().getName());
return true;
} else {
LOGGER.info(
- "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {}
because of timeout",
- id,
+ "PipeTaskCoordinator lock failed to acquire by thread {} because
of timeout",
Thread.currentThread().getName());
return false;
}
@@ -88,20 +74,12 @@ public class PipeTaskCoordinatorLock {
}
public void unlock() {
- final Long id = deque.poll();
- if (id == null) {
- LOGGER.error(
- "PipeTaskCoordinator lock released by thread {} but the lock is not
acquired by any thread",
- Thread.currentThread().getName());
- } else {
- LOGGER.debug(
- "PipeTaskCoordinator lock (id: {}) released by thread {}",
- id,
- Thread.currentThread().getName());
- }
+ lock.unlock();
+ LOGGER.debug(
+ "PipeTaskCoordinator lock released by thread {}",
Thread.currentThread().getName());
}
public boolean isLocked() {
- return !deque.isEmpty();
+ return lock.isLocked();
}
}