This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch revert-16324-pipe-procedure-lock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit caf2b70cf0b48620c859f4d8a4acf1beacc25992
Author: Jiang Tian <jt2594...@163.com>
AuthorDate: Wed Sep 10 16:19:06 2025 +0800

    Revert "Fix CN pipe procedures restore dead lock (#16324)"
    
    This reverts commit b422e9a5c59391a2aa17234c3cf5a230bbce9830.
---
 .../pipe/coordinator/runtime/PipeMetaSyncer.java   |  4 +--
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  2 +-
 .../pipe/coordinator/task/PipeTaskCoordinator.java | 14 ++++----
 .../coordinator/task/PipeTaskCoordinatorLock.java  | 13 +++-----
 .../subscription/SubscriptionCoordinator.java      |  2 +-
 .../confignode/procedure/ProcedureExecutor.java    | 38 +---------------------
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 19 +++--------
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     | 14 +++-----
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   | 14 +++-----
 ...bstractOperateSubscriptionAndPipeProcedure.java |  7 +---
 11 files changed, 33 insertions(+), 96 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index aaafcda346d..1df5ac021ca 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -146,7 +146,7 @@ public class PipeMetaSyncer {
 
   private boolean autoRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-        configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+        configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("Failed to acquire pipe lock for auto restart pipe task.");
       return false;
@@ -160,7 +160,7 @@ public class PipeMetaSyncer {
 
   private boolean handleSuccessfulRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-        configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+        configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("Failed to acquire pipe lock for handling successful 
restart.");
       return false;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 8e13a0460e4..ace07f5e2d3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -99,7 +99,7 @@ public class PipeHeartbeatParser {
         .submit(
             () -> {
               final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-                  
configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+                  
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
               if (pipeTaskInfo == null) {
                 LOGGER.warn(
                     "Failed to acquire lock when parseHeartbeat from node 
(id={}).", nodeId);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 25f8e0c0cc5..4aaf3ab46c3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -37,7 +37,6 @@ import 
org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,11 +68,10 @@ public class PipeTaskCoordinator {
    * @return the pipe task info holder, which can be used to get the pipe task 
info. The holder is
    *     null if the lock is not acquired.
    */
-  public Pair<AtomicReference<PipeTaskInfo>, Long> tryLock() {
-    long lockSeqId = pipeTaskCoordinatorLock.tryLock();
-    if (lockSeqId != -1) {
+  public AtomicReference<PipeTaskInfo> tryLock() {
+    if (pipeTaskCoordinatorLock.tryLock()) {
       pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-      return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+      return pipeTaskInfoHolder;
     }
 
     return null;
@@ -85,10 +83,10 @@ public class PipeTaskCoordinator {
    * @return the {@link PipeTaskInfo} holder, which can be used to get the 
{@link PipeTaskInfo}.
    *     Wait until lock is acquired
    */
-  public Pair<AtomicReference<PipeTaskInfo>, Long> lock() {
-    long lockSeqId = pipeTaskCoordinatorLock.lock();
+  public AtomicReference<PipeTaskInfo> lock() {
+    pipeTaskCoordinatorLock.lock();
     pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-    return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+    return pipeTaskInfoHolder;
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0caf620e7f2..e57add9a001 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -37,9 +37,8 @@ public class PipeTaskCoordinatorLock {
 
   private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
   private final AtomicLong idGenerator = new AtomicLong(0);
-  private final AtomicLong lockSeqIdGenerator = new AtomicLong(0);
 
-  public long lock() {
+  public void lock() {
     try {
       final long id = idGenerator.incrementAndGet();
       LOGGER.debug(
@@ -51,17 +50,15 @@ public class PipeTaskCoordinatorLock {
           "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
           id,
           Thread.currentThread().getName());
-      return lockSeqIdGenerator.incrementAndGet();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error(
           "Interrupted while waiting for PipeTaskCoordinator lock, current 
thread: {}",
           Thread.currentThread().getName());
-      return -1;
     }
   }
 
-  public long tryLock() {
+  public boolean tryLock() {
     try {
       final long id = idGenerator.incrementAndGet();
       LOGGER.debug(
@@ -73,20 +70,20 @@ public class PipeTaskCoordinatorLock {
             "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
             id,
             Thread.currentThread().getName());
-        return lockSeqIdGenerator.incrementAndGet();
+        return true;
       } else {
         LOGGER.info(
             "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} 
because of timeout",
             id,
             Thread.currentThread().getName());
-        return -1;
+        return false;
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error(
           "Interrupted while waiting for PipeTaskCoordinator lock, current 
thread: {}",
           Thread.currentThread().getName());
-      return -1;
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index d90e1bbaa30..b52f958d30a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -79,7 +79,7 @@ public class SubscriptionCoordinator {
   /////////////////////////////// Lock ///////////////////////////////
 
   public AtomicReference<SubscriptionInfo> tryLock() {
-    if (coordinatorLock.tryLock() != -1) {
+    if (coordinatorLock.tryLock()) {
       subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
       return subscriptionInfoHolder;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index f0f28ec2741..0d8368583b4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
@@ -38,12 +37,10 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,7 +49,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
 
@@ -120,41 +116,9 @@ public class ProcedureExecutor<Env> {
     recover();
   }
 
-  /***
-   * Filter out pipe procedures that do not need to re-acquire lock and 
re-execute when there are multiple locked pipe procedures during restore.
-   * @return non pipe procedures and one pipe procedure with max lock seq id 
(if there is.)
-   */
-  private List<Procedure<Env>> filteredProcedureList(final 
List<Procedure<Env>> procedures) {
-    List<Procedure<Env>> nonPipeOrLockedProcedures =
-        procedures.stream()
-            .filter(p -> !(p instanceof AbstractOperatePipeProcedureV2) || 
!p.isLockedWhenLoading())
-            .collect(Collectors.toList());
-
-    List<AbstractOperatePipeProcedureV2> lockedPipeProcedures =
-        procedures.stream()
-            .filter(p -> p instanceof AbstractOperatePipeProcedureV2 && 
p.isLockedWhenLoading())
-            .map(AbstractOperatePipeProcedureV2.class::cast)
-            .collect(Collectors.toList());
-    Optional<Procedure<Env>> maxPipeProcedure =
-        lockedPipeProcedures.stream()
-            
.max(Comparator.comparingLong(AbstractOperatePipeProcedureV2::getLockSeqId))
-            .map(p -> (Procedure<Env>) p);
-
-    if (lockedPipeProcedures.size() > 1) {
-      LOG.warn(
-          "[Procedure restore]Detected multiple locked pipe procedures in 
procedure executor {}, only keep last one {}",
-          lockedPipeProcedures,
-          maxPipeProcedure.get());
-    }
-
-    maxPipeProcedure.ifPresent(nonPipeOrLockedProcedures::add);
-    return nonPipeOrLockedProcedures;
-  }
-
   private void recover() {
     // 1.Build rollback stack
-    List<Procedure<Env>> procedureList =
-        filteredProcedureList(getProcedureListFromDifferentVersion());
+    List<Procedure<Env>> procedureList = 
getProcedureListFromDifferentVersion();
     // Load procedure wal file
     for (Procedure<Env> proc : procedureList) {
       if (proc.isFinished()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 0dde52ab5dd..25466d33983 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,17 +94,17 @@ public abstract class AbstractOperatePipeProcedureV2
   // This variable should not be serialized into procedure store,
   // putting it here is just for convenience
   protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
-  protected long lockSeqId;
 
   private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
       "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
 
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
-        
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
-    lockSeqId = lockRes.right;
-    return lockRes.left;
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .lock();
   }
 
   @Override
@@ -189,10 +188,6 @@ public abstract class AbstractOperatePipeProcedureV2
     }
   }
 
-  public long getLockSeqId() {
-    return lockSeqId;
-  }
-
   protected abstract PipeTaskOperation getOperation();
 
   /**
@@ -617,15 +612,11 @@ public abstract class AbstractOperatePipeProcedureV2
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
     ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
-    ReadWriteIOUtils.write(lockSeqId, stream);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     isRollbackFromOperateOnDataNodesSuccessful = 
ReadWriteIOUtils.readBool(byteBuffer);
-    if (byteBuffer.remaining() >= Long.BYTES) {
-      lockSeqId = ReadWriteIOUtils.readLong(byteBuffer);
-    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 28c59933cb0..665a3782a91 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -122,7 +122,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     final SubscriptionCoordinator subscriptionCoordinator =
         
env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator();
 
-    final AtomicReference<PipeTaskInfo> pipeTaskInfo = 
pipeTaskCoordinator.lock().left;
+    final AtomicReference<PipeTaskInfo> pipeTaskInfo = 
pipeTaskCoordinator.lock();
     pipePluginCoordinator.lock();
     SubscriptionInfo subscriptionInfo = 
subscriptionCoordinator.getSubscriptionInfo();
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 6b2fec6f381..401859f0a7e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,14 +64,11 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
   @Override
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .tryLock();
-    lockSeqId = lockRes.right;
-    return lockRes.left;
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .tryLock();
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 81ed75b7db2..393a8bd5ab8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,14 +73,11 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   @Override
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .tryLock();
-    lockSeqId = lockRes.right;
-    return lockRes.left;
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .tryLock();
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 16391a9ef58..de90951262c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -50,12 +50,7 @@ public abstract class 
AbstractOperateSubscriptionAndPipeProcedure
     LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", 
getProcId());
 
     pipeTaskInfo =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .lock()
-            .left;
+        
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
     } else {

Reply via email to