This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-procedure-duplicate-scheduling
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d66d51cdeda38934f98848c66893e93c119dec2b
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 10 18:02:29 2026 +0800

    Fix duplicate scheduling in procedure execution
---
 .../coordinator/task/PipeTaskCoordinatorLock.java  |  15 +--
 .../iotdb/confignode/procedure/Procedure.java      |  10 ++
 .../confignode/procedure/ProcedureExecutor.java    | 107 +++++++++++----------
 .../procedure/impl/StateMachineProcedure.java      |   3 +-
 .../procedure/impl/cq/CreateCQProcedure.java       |   2 +-
 .../procedure/impl/node/AbstractNodeProcedure.java |   1 +
 .../pipe/plugin/CreatePipePluginProcedure.java     |   2 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |   2 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |   2 +-
 .../impl/pipe/task/DropPipeProcedureV2.java        |   2 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       |   2 +-
 .../impl/pipe/task/StopPipeProcedureV2.java        |   2 +-
 .../schema/AlterEncodingCompressorProcedure.java   |   2 +-
 .../schema/AlterTimeSeriesDataTypeProcedure.java   |   2 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   2 +-
 .../impl/schema/DeleteLogicalViewProcedure.java    |   2 +-
 .../impl/schema/DeleteTimeSeriesProcedure.java     |   2 +-
 .../SubscriptionHandleLeaderChangeProcedure.java   |   2 +-
 .../impl/trigger/CreateTriggerProcedure.java       |   2 +-
 .../impl/trigger/DropTriggerProcedure.java         |   2 +-
 .../confignode/procedure/scheduler/LockQueue.java  |   4 +
 .../task/PipeTaskCoordinatorLockTest.java          |  60 ++++++++++++
 .../iotdb/confignode/procedure/TestLockRegime.java |  19 ++++
 .../procedure/TestProcedureExecutor.java           |  52 ++++++++++
 .../PipeHandleLeaderChangeProcedureTest.java       |  47 +++++++++
 26 files changed, 269 insertions(+), 81 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0f788435394..3a6fe1f698a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -46,17 +46,10 @@ public class PipeTaskCoordinatorLock {
     LOGGER.debug(
         ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD,
         Thread.currentThread().getName());
-    try {
-      semaphore.acquire();
-      LOGGER.debug(
-          ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
-          Thread.currentThread().getName());
-    } catch (final InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOGGER.error(
-          
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
-          Thread.currentThread().getName());
-    }
+    semaphore.acquireUninterruptibly();
+    LOGGER.debug(
+        ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
+        Thread.currentThread().getName());
   }
 
   public boolean tryLock() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 29a241b16d0..24a8adc713f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -62,6 +63,7 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   private volatile long lastUpdate;
 
   private final AtomicReference<byte[]> result = new AtomicReference<>();
+  private final AtomicBoolean executing = new AtomicBoolean(false);
   private volatile boolean locked = false;
   private boolean lockedWhenLoading = false;
 
@@ -256,6 +258,14 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   }
 
   // -------------------------Internal methods - called by the 
procedureExecutor------------------
+  final boolean tryAcquireExecution() {
+    return executing.compareAndSet(false, true);
+  }
+
+  final void releaseExecution() {
+    executing.set(false);
+  }
+
   /**
    * Internal method called by the ProcedureExecutor that starts the 
user-level code execute().
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 82afea3859f..16a9c72626d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -42,7 +41,6 @@ import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -328,45 +326,44 @@ public class ProcedureExecutor<Env> {
       LOG.warn(ProcedureMessages.ROLLBACK_STACK_IS_NULL_FOR, proc.getProcId());
       return;
     }
-    ProcedureLockState lockState = null;
-    try {
-      do {
-        if (!rootProcStack.acquire()) {
-          if (rootProcStack.setRollback()) {
-            lockState = executeRootStackRollback(rootProcId, rootProcStack);
-            switch (lockState) {
+    ProcedureLockState lockState;
+    do {
+      if (!rootProcStack.acquire()) {
+        if (rootProcStack.setRollback()) {
+          lockState = executeRootStackRollback(rootProcId, rootProcStack);
+          switch (lockState) {
+            case LOCK_ACQUIRED:
+              break;
+            case LOCK_EVENT_WAIT:
+              LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc);
+              rootProcStack.unsetRollback();
+              break;
+            case LOCK_YIELD_WAIT:
+              rootProcStack.unsetRollback();
+              scheduler.yield(proc);
+              break;
+            default:
+              throw new UnsupportedOperationException();
+          }
+        } else {
+          if (!proc.wasExecuted()) {
+            switch (executeRollback(proc)) {
               case LOCK_ACQUIRED:
                 break;
               case LOCK_EVENT_WAIT:
-                LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc);
-                rootProcStack.unsetRollback();
+                
LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, 
proc);
                 break;
               case LOCK_YIELD_WAIT:
-                rootProcStack.unsetRollback();
                 scheduler.yield(proc);
                 break;
               default:
                 throw new UnsupportedOperationException();
             }
-          } else {
-            if (!proc.wasExecuted()) {
-              switch (executeRollback(proc)) {
-                case LOCK_ACQUIRED:
-                  break;
-                case LOCK_EVENT_WAIT:
-                  LOG.info(
-                      
ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc);
-                  break;
-                case LOCK_YIELD_WAIT:
-                  scheduler.yield(proc);
-                  break;
-                default:
-                  throw new UnsupportedOperationException();
-              }
-            }
           }
-          break;
         }
+        break;
+      }
+      try {
         lockState = acquireLock(proc);
         switch (lockState) {
           case LOCK_ACQUIRED:
@@ -379,29 +376,23 @@ public class ProcedureExecutor<Env> {
           default:
             throw new UnsupportedOperationException();
         }
+      } finally {
         rootProcStack.release();
+      }
 
-        if (proc.isSuccess()) {
-          // update metrics on finishing the procedure
-          proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), 
true);
-          LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, 
proc.elapsedTime());
-          if (proc.getProcId() == rootProcId) {
-            rootProcedureCleanup(proc);
-          } else {
-            executeCompletionCleanup(proc);
-          }
-          return;
+      if (proc.isSuccess()) {
+        // update metrics on finishing the procedure
+        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
+        LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, 
proc.elapsedTime());
+        if (proc.getProcId() == rootProcId) {
+          rootProcedureCleanup(proc);
+        } else {
+          executeCompletionCleanup(proc);
         }
-
-      } while (rootProcStack.isFailed());
-    } finally {
-      // Only after procedure has completed execution can it be allowed to be 
rescheduled to prevent
-      // data races
-      if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
-        LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, 
proc.getProcId());
-        ((ConfigNodeProcedureEnv) 
this.environment).getNodeLock().waitProcedure(proc);
+        return;
       }
-    }
+
+    } while (rootProcStack.isFailed());
   }
 
   /**
@@ -414,6 +405,7 @@ public class ProcedureExecutor<Env> {
     if (proc.getState() != ProcedureState.RUNNABLE) {
       LOG.error(
           "The executing procedure should in RUNNABLE state, but it's not. 
Procedure is {}", proc);
+      releaseLock(proc, false);
       return;
     }
     boolean reExecute;
@@ -787,6 +779,13 @@ public class ProcedureExecutor<Env> {
             Thread.sleep(1000);
             continue;
           }
+          boolean executionAcquired = false;
+          while (isRunning() && !(executionAcquired = 
procedure.tryAcquireExecution())) {
+            Thread.sleep(10);
+          }
+          if (!executionAcquired) {
+            continue;
+          }
           this.activeProcedure.set(procedure);
           activeExecutorCount.incrementAndGet();
           startTime.set(System.currentTimeMillis());
@@ -795,13 +794,14 @@ public class ProcedureExecutor<Env> {
             executeProcedure(procedure);
           } finally {
             PROCEDURE_EXECUTION_CONTEXT.remove();
+            procedure.releaseExecution();
+            activeExecutorCount.decrementAndGet();
+            LOG.trace(
+                "Halt pid={}, activeCount={}", procedure.getProcId(), 
activeExecutorCount.get());
+            lastUpdated = System.currentTimeMillis();
+            startTime.set(lastUpdated);
           }
-          activeExecutorCount.decrementAndGet();
-          LOG.trace(
-              "Halt pid={}, activeCount={}", procedure.getProcId(), 
activeExecutorCount.get());
           this.activeProcedure.set(null);
-          lastUpdated = System.currentTimeMillis();
-          startTime.set(lastUpdated);
         }
 
       } catch (Exception e) {
@@ -812,6 +812,7 @@ public class ProcedureExecutor<Env> {
               this.activeProcedure.get(),
               e);
         }
+        this.activeProcedure.set(null);
       } finally {
         LOG.info(ProcedureMessages.PROCEDURE_WORKER_TERMINATED, getName());
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index c3c9167e6ef..f7275a152d7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -195,7 +195,8 @@ public abstract class StateMachineProcedure<Env, TState> 
extends Procedure<Env>
             nextState);
       }
     }
-    if (getStateId(getCurrentState()) == stateToBeAdded) {
+    final TState currentState = getCurrentState();
+    if (currentState != null && getStateId(currentState) == stateToBeAdded) {
       cycles++;
     } else {
       cycles = 0;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index 490f723d2e6..240e84f6316 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -299,7 +299,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     }
     CreateCQProcedure that = (CreateCQProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && isGeneratedByPipe == that.isGeneratedByPipe
         && firstExecutionTime == that.firstExecutionTime
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index b1410279173..9a088560a5b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -50,6 +50,7 @@ public abstract class AbstractNodeProcedure<TState>
       LOG.info(
           "procedureId {} acquire lock failed, will wait for lock after 
finishing execution.",
           getProcId());
+      configNodeProcedureEnv.getNodeLock().waitProcedure(this);
       return ProcedureLockState.LOCK_EVENT_WAIT;
     } finally {
       configNodeProcedureEnv.getSchedulerLock().unlock();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index ddc658bdf4d..b1583c74c6b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -327,7 +327,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
     if (that instanceof CreatePipePluginProcedure) {
       CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) 
that;
       return thatProcedure.getProcId() == getProcId()
-          && thatProcedure.getCurrentState().equals(getCurrentState())
+          && Objects.equals(thatProcedure.getCurrentState(), getCurrentState())
           && thatProcedure.getCycles() == getCycles()
           && thatProcedure.pipePluginMeta.equals(pipePluginMeta);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 771ab6230bf..ff1caad7ad4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -296,7 +296,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     if (that instanceof DropPipePluginProcedure) {
       final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) 
that;
       return thatProcedure.getProcId() == getProcId()
-          && thatProcedure.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProcedure.getCurrentState(), 
this.getCurrentState())
           && thatProcedure.getCycles() == this.getCycles()
           && (thatProcedure.pluginName).equals(pluginName);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 2c5b9566fce..b2ef752854f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -185,7 +185,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     }
     PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && this.regionGroupToOldAndNewLeaderPairMap.equals(
             that.regionGroupToOldAndNewLeaderPairMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index a0dec367352..76bf432d454 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -190,7 +190,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
     }
     PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && needWriteConsensusOnConfigNodes == 
that.needWriteConsensusOnConfigNodes
         && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 4ba69067851..530ca4c1956 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -169,7 +169,7 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && pipeName.equals(that.pipeName);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 251a4f0e3af..7c9bff96f8f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -188,7 +188,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && pipeName.equals(that.pipeName);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index e2ec41f3b83..4410be01d3f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -191,7 +191,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && isStoppedByRuntimeExceptionBeforeStop == 
that.isStoppedByRuntimeExceptionBeforeStop
         && pipeName.equals(that.pipeName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
index 1ad60f6f852..f168f6dd683 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
@@ -311,7 +311,7 @@ public class AlterEncodingCompressorProcedure
     }
     final AlterEncodingCompressorProcedure that = 
(AlterEncodingCompressorProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == getCycles()
         && Objects.equals(this.queryId, that.queryId)
         && this.isGeneratedByPipe == that.isGeneratedByPipe
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
index 26ea988f98e..3a4431047c2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
@@ -407,7 +407,7 @@ public class AlterTimeSeriesDataTypeProcedure
     }
     final AlterTimeSeriesDataTypeProcedure that = 
(AlterTimeSeriesDataTypeProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == getCycles()
         && this.isGeneratedByPipe == that.isGeneratedByPipe
         && this.measurementPath.equals(that.measurementPath)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 0b3eca82dce..7b8de782173 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -315,7 +315,7 @@ public class DeleteDatabaseProcedure
     if (that instanceof DeleteDatabaseProcedure) {
       final DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && 
thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
index 4f63e96840c..5846c79b521 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
@@ -313,7 +313,7 @@ public class DeleteLogicalViewProcedure
     }
     final DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == that.getCycles()
         && isGeneratedByPipe == that.isGeneratedByPipe
         && patternTree.equals(that.patternTree);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
index 0b5e45b5ca1..1cd5efe639c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
@@ -392,7 +392,7 @@ public class DeleteTimeSeriesProcedure
     }
     final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == getCycles()
         && this.isGeneratedByPipe == that.isGeneratedByPipe
         && this.patternTree.equals(that.patternTree);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
index 5337b719d20..222b66a944b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
@@ -265,7 +265,7 @@ public class SubscriptionHandleLeaderChangeProcedure 
extends AbstractOperateSubs
     final SubscriptionHandleLeaderChangeProcedure that =
         (SubscriptionHandleLeaderChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && runtimeVersion == that.runtimeVersion
         && 
regionGroupToOldAndNewLeaderPairMap.equals(that.regionGroupToOldAndNewLeaderPairMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
index 8406eee97b3..dd297bb4d22 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
@@ -312,7 +312,7 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
     if (that instanceof CreateTriggerProcedure) {
       CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && thatProc.triggerInformation.equals(this.triggerInformation);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
index 126197cbfe6..a3f2c51baa2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
@@ -180,7 +180,7 @@ public class DropTriggerProcedure extends 
AbstractNodeProcedure<DropTriggerState
     if (that instanceof DropTriggerProcedure) {
       DropTriggerProcedure thatProc = (DropTriggerProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && (thatProc.triggerName).equals(this.triggerName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
index 832e339c0ae..895c173dbc8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
@@ -46,6 +46,10 @@ public class LockQueue {
   }
 
   public void waitProcedure(Procedure<?> procedure) {
+    if (deque.stream()
+        .anyMatch(waitingProcedure -> waitingProcedure.getProcId() == 
procedure.getProcId())) {
+      return;
+    }
     deque.addLast(procedure);
   }
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
new file mode 100644
index 00000000000..74d5d821d76
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTaskCoordinatorLockTest {
+
+  @Test
+  public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws 
Exception {
+    PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+    lock.lock();
+
+    CountDownLatch waiting = new CountDownLatch(1);
+    AtomicBoolean acquired = new AtomicBoolean(false);
+    Thread thread =
+        new Thread(
+            () -> {
+              Thread.currentThread().interrupt();
+              waiting.countDown();
+              lock.lock();
+              acquired.set(true);
+              lock.unlock();
+            });
+    thread.start();
+
+    Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS));
+    TimeUnit.MILLISECONDS.sleep(200);
+    Assert.assertFalse(acquired.get());
+
+    lock.unlock();
+    thread.join(TimeUnit.SECONDS.toMillis(3));
+
+    Assert.assertFalse(thread.isAlive());
+    Assert.assertTrue(acquired.get());
+    Assert.assertFalse(lock.isLocked());
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
index 500a51e9e3d..85b0dfb8c9f 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.confignode.procedure;
 
+import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
 import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
 
 import org.junit.Assert;
@@ -43,4 +46,20 @@ public class TestLockRegime extends TestProcedureBase {
         this.procExecutor, 
procIdList.stream().mapToLong(Long::longValue).toArray());
     Assert.assertEquals(env.lockAcquireSeq.toString(), 
env.executeSeq.toString());
   }
+
+  @Test
+  public void testLockQueueDoesNotWakeDuplicateProcedure() {
+    LockQueue lockQueue = new LockQueue();
+    SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler();
+    scheduler.start();
+
+    NoopProcedure procedure = new NoopProcedure();
+    procedure.setProcId(1);
+    lockQueue.waitProcedure(procedure);
+    lockQueue.waitProcedure(procedure);
+
+    Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler));
+    Assert.assertEquals(1, scheduler.size());
+    scheduler.stop();
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index dce7f2ba5dc..0b4e716dc3b 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -23,11 +23,13 @@ import 
org.apache.iotdb.confignode.procedure.entity.IncProcedure;
 import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
 import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
 import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -96,6 +98,23 @@ public class TestProcedureExecutor extends TestProcedureBase 
{
     ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2);
   }
 
+  @Test
+  public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws 
InterruptedException {
+    BlockingProcedure blockingProcedure = new BlockingProcedure();
+    long procId = procExecutor.submitProcedure(blockingProcedure);
+
+    Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS));
+
+    procExecutor.getScheduler().addFront(blockingProcedure);
+    boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS);
+
+    blockingProcedure.releaseExecutions(duplicated ? 2 : 1);
+    ProcedureTestUtil.waitForProcedure(procExecutor, procId);
+
+    Assert.assertFalse(duplicated);
+    Assert.assertEquals(1, blockingProcedure.getExecutionCount());
+  }
+
   private int waitThreadCount(final int expectedThreads) {
     long startTime = System.currentTimeMillis();
     while (procExecutor.isRunning()
@@ -107,4 +126,37 @@ public class TestProcedureExecutor extends 
TestProcedureBase {
     }
     return procExecutor.getWorkerThreadCount();
   }
+
+  private static class BlockingProcedure extends Procedure<TestProcEnv> {
+
+    private final Semaphore entered = new Semaphore(0);
+    private final Semaphore finish = new Semaphore(0);
+    private final AtomicInteger executionCount = new AtomicInteger();
+
+    @Override
+    protected Procedure<TestProcEnv>[] execute(TestProcEnv env) throws 
InterruptedException {
+      executionCount.incrementAndGet();
+      entered.release();
+      finish.acquire();
+      return null;
+    }
+
+    @Override
+    protected void rollback(TestProcEnv env)
+        throws IOException, InterruptedException, ProcedureException {
+      // No state to roll back.
+    }
+
+    private boolean awaitExecution(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return entered.tryAcquire(timeout, unit);
+    }
+
+    private void releaseExecutions(int permits) {
+      finish.release(permits);
+    }
+
+    private int getExecutionCount() {
+      return executionCount.get();
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index 75c0963a27f..b2ec615fbcd 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -106,4 +106,51 @@ public class PipeHandleLeaderChangeProcedureTest {
       fail();
     }
   }
+
+  @Test
+  public void completedProcedureEqualsTest() {
+    Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+    leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), 
new Pair<>(1, 2));
+
+    try {
+      PipeHandleLeaderChangeProcedure proc = 
deserializeCompletedProcedure(leaderMap);
+      PipeHandleLeaderChangeProcedure proc2 = 
deserializeCompletedProcedure(leaderMap);
+
+      assertEquals(proc, proc2);
+      assertEquals(proc.hashCode(), proc2.hashCode());
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure(
+      Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) throws 
Exception {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+    outputStream.writeLong(Procedure.NO_PROC_ID);
+    outputStream.writeInt(ProcedureState.SUCCESS.ordinal());
+    outputStream.writeLong(0L);
+    outputStream.writeLong(0L);
+    outputStream.writeLong(Procedure.NO_PROC_ID);
+    outputStream.writeLong(Procedure.NO_TIMEOUT);
+    outputStream.writeInt(-1);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(-1);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(1);
+    outputStream.writeInt(Integer.MIN_VALUE);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(leaderMap.size());
+    for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry : 
leaderMap.entrySet()) {
+      outputStream.writeInt(entry.getKey().getId());
+      outputStream.writeInt(entry.getValue().getLeft());
+      outputStream.writeInt(entry.getValue().getRight());
+    }
+
+    ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    return (PipeHandleLeaderChangeProcedure) 
ProcedureFactory.getInstance().create(buffer);
+  }
 }


Reply via email to