This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch revert-16324-pipe-procedure-lock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit caf2b70cf0b48620c859f4d8a4acf1beacc25992 Author: Jiang Tian <jt2594...@163.com> AuthorDate: Wed Sep 10 16:19:06 2025 +0800 Revert "Fix CN pipe procedures restore dead lock (#16324)" This reverts commit b422e9a5c59391a2aa17234c3cf5a230bbce9830. --- .../pipe/coordinator/runtime/PipeMetaSyncer.java | 4 +-- .../runtime/heartbeat/PipeHeartbeatParser.java | 2 +- .../pipe/coordinator/task/PipeTaskCoordinator.java | 14 ++++---- .../coordinator/task/PipeTaskCoordinatorLock.java | 13 +++----- .../subscription/SubscriptionCoordinator.java | 2 +- .../confignode/procedure/ProcedureExecutor.java | 38 +--------------------- .../impl/pipe/AbstractOperatePipeProcedureV2.java | 19 +++-------- .../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +- .../runtime/PipeHandleMetaChangeProcedure.java | 14 +++----- .../impl/pipe/runtime/PipeMetaSyncProcedure.java | 14 +++----- ...bstractOperateSubscriptionAndPipeProcedure.java | 7 +--- 11 files changed, 33 insertions(+), 96 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java index aaafcda346d..1df5ac021ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java @@ -146,7 +146,7 @@ public class PipeMetaSyncer { private boolean autoRestartWithLock() { final AtomicReference<PipeTaskInfo> pipeTaskInfo = - configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left; + configManager.getPipeManager().getPipeTaskCoordinator().tryLock(); if (pipeTaskInfo == null) { LOGGER.warn("Failed to acquire pipe lock for auto restart pipe task."); return false; @@ -160,7 +160,7 @@ public class PipeMetaSyncer { private boolean handleSuccessfulRestartWithLock() { final AtomicReference<PipeTaskInfo> pipeTaskInfo = - configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left; + configManager.getPipeManager().getPipeTaskCoordinator().tryLock(); if (pipeTaskInfo == null) { LOGGER.warn("Failed to acquire pipe lock for handling successful restart."); return false; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 8e13a0460e4..ace07f5e2d3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -99,7 +99,7 @@ public class PipeHeartbeatParser { .submit( () -> { final AtomicReference<PipeTaskInfo> pipeTaskInfo = - configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left; + configManager.getPipeManager().getPipeTaskCoordinator().tryLock(); if (pipeTaskInfo == null) { LOGGER.warn( "Failed to acquire lock when parseHeartbeat from node (id={}).", nodeId); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index 25f8e0c0cc5..4aaf3ab46c3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -37,7 +37,6 @@ import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,11 +68,10 @@ public class PipeTaskCoordinator { * @return the pipe task info holder, which can be used to get the pipe task info. The holder is * null if the lock is not acquired. */ - public Pair<AtomicReference<PipeTaskInfo>, Long> tryLock() { - long lockSeqId = pipeTaskCoordinatorLock.tryLock(); - if (lockSeqId != -1) { + public AtomicReference<PipeTaskInfo> tryLock() { + if (pipeTaskCoordinatorLock.tryLock()) { pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo); - return new Pair<>(pipeTaskInfoHolder, lockSeqId); + return pipeTaskInfoHolder; } return null; @@ -85,10 +83,10 @@ public class PipeTaskCoordinator { * @return the {@link PipeTaskInfo} holder, which can be used to get the {@link PipeTaskInfo}. * Wait until lock is acquired */ - public Pair<AtomicReference<PipeTaskInfo>, Long> lock() { - long lockSeqId = pipeTaskCoordinatorLock.lock(); + public AtomicReference<PipeTaskInfo> lock() { + pipeTaskCoordinatorLock.lock(); pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo); - return new Pair<>(pipeTaskInfoHolder, lockSeqId); + return pipeTaskInfoHolder; } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index 0caf620e7f2..e57add9a001 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -37,9 +37,8 @@ public class PipeTaskCoordinatorLock { private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1); private final AtomicLong idGenerator = new AtomicLong(0); - private final AtomicLong lockSeqIdGenerator = new AtomicLong(0); - public long lock() { + public void lock() { try { final long id = idGenerator.incrementAndGet(); LOGGER.debug( @@ -51,17 +50,15 @@ public class PipeTaskCoordinatorLock { "PipeTaskCoordinator lock (id: {}) acquired by thread {}", id, Thread.currentThread().getName()); - return lockSeqIdGenerator.incrementAndGet(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.error( "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}", Thread.currentThread().getName()); - return -1; } } - public long tryLock() { + public boolean tryLock() { try { final long id = idGenerator.incrementAndGet(); LOGGER.debug( @@ -73,20 +70,20 @@ public class PipeTaskCoordinatorLock { "PipeTaskCoordinator lock (id: {}) acquired by thread {}", id, Thread.currentThread().getName()); - return lockSeqIdGenerator.incrementAndGet(); + return true; } else { LOGGER.info( "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} because of timeout", id, Thread.currentThread().getName()); - return -1; + return false; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.error( "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}", Thread.currentThread().getName()); - return -1; + return false; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index d90e1bbaa30..b52f958d30a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -79,7 +79,7 @@ public class SubscriptionCoordinator { /////////////////////////////// Lock /////////////////////////////// public AtomicReference<SubscriptionInfo> tryLock() { - if (coordinatorLock.tryLock() != -1) { + if (coordinatorLock.tryLock()) { subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo); return subscriptionInfoHolder; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index f0f28ec2741..0d8368583b4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; -import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; @@ -38,12 +37,10 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.Deque; import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -52,7 +49,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID; @@ -120,41 +116,9 @@ public class ProcedureExecutor<Env> { recover(); } - /*** - * Filter out pipe procedures that do not need to re-acquire lock and re-execute when there are multiple locked pipe procedures during restore. - * @return non pipe procedures and one pipe procedure with max lock seq id (if there is.) - */ - private List<Procedure<Env>> filteredProcedureList(final List<Procedure<Env>> procedures) { - List<Procedure<Env>> nonPipeOrLockedProcedures = - procedures.stream() - .filter(p -> !(p instanceof AbstractOperatePipeProcedureV2) || !p.isLockedWhenLoading()) - .collect(Collectors.toList()); - - List<AbstractOperatePipeProcedureV2> lockedPipeProcedures = - procedures.stream() - .filter(p -> p instanceof AbstractOperatePipeProcedureV2 && p.isLockedWhenLoading()) - .map(AbstractOperatePipeProcedureV2.class::cast) - .collect(Collectors.toList()); - Optional<Procedure<Env>> maxPipeProcedure = - lockedPipeProcedures.stream() - .max(Comparator.comparingLong(AbstractOperatePipeProcedureV2::getLockSeqId)) - .map(p -> (Procedure<Env>) p); - - if (lockedPipeProcedures.size() > 1) { - LOG.warn( - "[Procedure restore]Detected multiple locked pipe procedures in procedure executor {}, only keep last one {}", - lockedPipeProcedures, - maxPipeProcedure.get()); - } - - maxPipeProcedure.ifPresent(nonPipeOrLockedProcedures::add); - return nonPipeOrLockedProcedures; - } - private void recover() { // 1.Build rollback stack - List<Procedure<Env>> procedureList = - filteredProcedureList(getProcedureListFromDifferentVersion()); + List<Procedure<Env>> procedureList = getProcedureListFromDifferentVersion(); // Load procedure wal file for (Procedure<Env> proc : procedureList) { if (proc.isFinished()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 0dde52ab5dd..25466d33983 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -37,7 +37,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,17 +94,17 @@ public abstract class AbstractOperatePipeProcedureV2 // This variable should not be serialized into procedure store, // putting it here is just for convenience protected AtomicReference<PipeTaskInfo> pipeTaskInfo; - protected long lockSeqId; private static final String SKIP_PIPE_PROCEDURE_MESSAGE = "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing."; protected AtomicReference<PipeTaskInfo> acquireLockInternal( ConfigNodeProcedureEnv configNodeProcedureEnv) { - Pair<AtomicReference<PipeTaskInfo>, Long> lockRes = - configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock(); - lockSeqId = lockRes.right; - return lockRes.left; + return configNodeProcedureEnv + .getConfigManager() + .getPipeManager() + .getPipeTaskCoordinator() + .lock(); } @Override @@ -189,10 +188,6 @@ public abstract class AbstractOperatePipeProcedureV2 } } - public long getLockSeqId() { - return lockSeqId; - } - protected abstract PipeTaskOperation getOperation(); /** @@ -617,15 +612,11 @@ public abstract class AbstractOperatePipeProcedureV2 public void serialize(DataOutputStream stream) throws IOException { super.serialize(stream); ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream); - ReadWriteIOUtils.write(lockSeqId, stream); } @Override public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); isRollbackFromOperateOnDataNodesSuccessful = ReadWriteIOUtils.readBool(byteBuffer); - if (byteBuffer.remaining() >= Long.BYTES) { - lockSeqId = ReadWriteIOUtils.readLong(byteBuffer); - } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index 28c59933cb0..665a3782a91 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -122,7 +122,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi final SubscriptionCoordinator subscriptionCoordinator = env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator(); - final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock().left; + final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock(); pipePluginCoordinator.lock(); SubscriptionInfo subscriptionInfo = subscriptionCoordinator.getSubscriptionInfo(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 6b2fec6f381..401859f0a7e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -31,7 +31,6 @@ import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,14 +64,11 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV @Override protected AtomicReference<PipeTaskInfo> acquireLockInternal( ConfigNodeProcedureEnv configNodeProcedureEnv) { - Pair<AtomicReference<PipeTaskInfo>, Long> lockRes = - configNodeProcedureEnv - .getConfigManager() - .getPipeManager() - .getPipeTaskCoordinator() - .tryLock(); - lockSeqId = lockRes.right; - return lockRes.left; + return configNodeProcedureEnv + .getConfigManager() + .getPipeManager() + .getPipeTaskCoordinator() + .tryLock(); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index 81ed75b7db2..393a8bd5ab8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -39,7 +39,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,14 +73,11 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { @Override protected AtomicReference<PipeTaskInfo> acquireLockInternal( ConfigNodeProcedureEnv configNodeProcedureEnv) { - Pair<AtomicReference<PipeTaskInfo>, Long> lockRes = - configNodeProcedureEnv - .getConfigManager() - .getPipeManager() - .getPipeTaskCoordinator() - .tryLock(); - lockSeqId = lockRes.right; - return lockRes.left; + return configNodeProcedureEnv + .getConfigManager() + .getPipeManager() + .getPipeTaskCoordinator() + .tryLock(); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java index 16391a9ef58..de90951262c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java @@ -50,12 +50,7 @@ public abstract class AbstractOperateSubscriptionAndPipeProcedure LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", getProcId()); pipeTaskInfo = - configNodeProcedureEnv - .getConfigManager() - .getPipeManager() - .getPipeTaskCoordinator() - .lock() - .left; + configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock(); if (pipeTaskInfo == null) { LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId()); } else {