This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b681b506c7b Pipe: Refactored the PipeTaskCoordinatorLock (#16988) 
(#16997)
b681b506c7b is described below

commit b681b506c7b3bb505e0d9db547fe3365a457e672
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 8 11:45:22 2026 +0800

    Pipe: Refactored the PipeTaskCoordinatorLock (#16988) (#16997)
    
    * may-fix
    
    * refactor
    
    * commit-sug
---
 .../pipe/coordinator/task/PipeTaskCoordinator.java | 16 +------
 .../coordinator/task/PipeTaskCoordinatorLock.java  | 52 +++++++---------------
 2 files changed, 17 insertions(+), 51 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 100da334d35..f48c64c3ea2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -51,7 +51,6 @@ public class PipeTaskCoordinator {
   private final PipeTaskInfo pipeTaskInfo;
 
   private final PipeTaskCoordinatorLock pipeTaskCoordinatorLock;
-  private AtomicReference<PipeTaskInfo> pipeTaskInfoHolder;
 
   public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo 
pipeTaskInfo) {
     this.configManager = configManager;
@@ -66,12 +65,7 @@ public class PipeTaskCoordinator {
    *     null if the lock is not acquired.
    */
   public AtomicReference<PipeTaskInfo> tryLock() {
-    if (pipeTaskCoordinatorLock.tryLock()) {
-      pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-      return pipeTaskInfoHolder;
-    }
-
-    return null;
+    return pipeTaskCoordinatorLock.tryLock() ? new 
AtomicReference<>(pipeTaskInfo) : null;
   }
 
   /**
@@ -82,8 +76,7 @@ public class PipeTaskCoordinator {
    */
   public AtomicReference<PipeTaskInfo> lock() {
     pipeTaskCoordinatorLock.lock();
-    pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-    return pipeTaskInfoHolder;
+    return new AtomicReference<>(pipeTaskInfo);
   }
 
   /**
@@ -94,11 +87,6 @@ public class PipeTaskCoordinator {
    *     the lock.
    */
   public boolean unlock() {
-    if (pipeTaskInfoHolder != null) {
-      pipeTaskInfoHolder.set(null);
-      pipeTaskInfoHolder = null;
-    }
-
     try {
       pipeTaskCoordinatorLock.unlock();
       return true;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index e57add9a001..12b92619004 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -22,10 +22,8 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task 
coordinator. It is used to
@@ -35,22 +33,16 @@ public class PipeTaskCoordinatorLock {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
 
-  private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
-  private final AtomicLong idGenerator = new AtomicLong(0);
+  private final ReentrantLock lock = new ReentrantLock();
 
   public void lock() {
+    LOGGER.debug(
+        "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
     try {
-      final long id = idGenerator.incrementAndGet();
+      lock.lockInterruptibly();
       LOGGER.debug(
-          "PipeTaskCoordinator lock (id: {}) waiting for thread {}",
-          id,
-          Thread.currentThread().getName());
-      deque.put(id);
-      LOGGER.debug(
-          "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
-          id,
-          Thread.currentThread().getName());
-    } catch (InterruptedException e) {
+          "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
+    } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error(
           "Interrupted while waiting for PipeTaskCoordinator lock, current 
thread: {}",
@@ -60,21 +52,15 @@ public class PipeTaskCoordinatorLock {
 
   public boolean tryLock() {
     try {
-      final long id = idGenerator.incrementAndGet();
       LOGGER.debug(
-          "PipeTaskCoordinator lock (id: {}) waiting for thread {}",
-          id,
-          Thread.currentThread().getName());
-      if (deque.offer(id, 10, TimeUnit.SECONDS)) {
+          "PipeTaskCoordinator lock waiting for thread {}", 
Thread.currentThread().getName());
+      if (lock.tryLock(10, TimeUnit.SECONDS)) {
         LOGGER.debug(
-            "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
-            id,
-            Thread.currentThread().getName());
+            "PipeTaskCoordinator lock acquired by thread {}", 
Thread.currentThread().getName());
         return true;
       } else {
         LOGGER.info(
-            "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} 
because of timeout",
-            id,
+            "PipeTaskCoordinator lock failed to acquire by thread {} because 
of timeout",
             Thread.currentThread().getName());
         return false;
       }
@@ -88,20 +74,12 @@ public class PipeTaskCoordinatorLock {
   }
 
   public void unlock() {
-    final Long id = deque.poll();
-    if (id == null) {
-      LOGGER.error(
-          "PipeTaskCoordinator lock released by thread {} but the lock is not 
acquired by any thread",
-          Thread.currentThread().getName());
-    } else {
-      LOGGER.debug(
-          "PipeTaskCoordinator lock (id: {}) released by thread {}",
-          id,
-          Thread.currentThread().getName());
-    }
+    lock.unlock();
+    LOGGER.debug(
+        "PipeTaskCoordinator lock released by thread {}", 
Thread.currentThread().getName());
   }
 
   public boolean isLocked() {
-    return !deque.isEmpty();
+    return lock.isLocked();
   }
 }

Reply via email to