This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a45a3b7b70 Fix duplicate scheduling in procedure execution (#17902)
0a45a3b7b70 is described below

commit 0a45a3b7b708c3b436e6b9b4c6aa9b65bede7076
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 18:18:59 2026 +0800

    Fix duplicate scheduling in procedure execution (#17902)
    
    * Fix duplicate scheduling in procedure execution
    
    * Fix delayed procedure deduplication and semaphore release
    
    * Fix SQL parser error handler traversal
    
    * Fix pipe procedure lock release race
    
    * Fix procedure lock wait scheduling
    
    (cherry picked from commit c25849a093b466966353ecd7b63f722535b8d5b3)
---
 .../coordinator/task/PipeTaskCoordinatorLock.java  |  16 +--
 .../iotdb/confignode/procedure/Procedure.java      |  20 ++++
 .../confignode/procedure/ProcedureExecutor.java    | 119 ++++++++++++++-------
 .../procedure/TimeoutExecutorThread.java           |  32 +++++-
 .../procedure/impl/StateMachineProcedure.java      |   3 +-
 .../procedure/impl/cq/CreateCQProcedure.java       |   2 +-
 .../procedure/impl/node/AbstractNodeProcedure.java |  12 +++
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  23 ++--
 .../pipe/plugin/CreatePipePluginProcedure.java     |   2 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |   2 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |   2 +-
 .../impl/pipe/task/DropPipeProcedureV2.java        |   2 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       |   2 +-
 .../impl/pipe/task/StopPipeProcedureV2.java        |   2 +-
 .../schema/AlterEncodingCompressorProcedure.java   |   2 +-
 .../schema/AlterTimeSeriesDataTypeProcedure.java   |   2 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   2 +-
 .../impl/schema/DeleteLogicalViewProcedure.java    |   2 +-
 .../impl/schema/DeleteTimeSeriesProcedure.java     |   2 +-
 .../SubscriptionHandleLeaderChangeProcedure.java   |   2 +-
 .../impl/trigger/CreateTriggerProcedure.java       |   2 +-
 .../impl/trigger/DropTriggerProcedure.java         |   2 +-
 .../confignode/procedure/scheduler/LockQueue.java  |  10 +-
 .../scheduler/SimpleProcedureScheduler.java        |  37 ++++++-
 .../task/PipeTaskCoordinatorLockTest.java          |  60 +++++++++++
 .../iotdb/confignode/procedure/TestLockRegime.java |  23 ++++
 .../procedure/TestProcedureExecutor.java           |  92 ++++++++++++++++
 .../procedure/entity/SimpleLockProcedure.java      |  11 +-
 .../PipeHandleLeaderChangeProcedureTest.java       |  47 ++++++++
 .../plan/relational/sql/parser/ErrorHandler.java   |   6 +-
 .../db/schemaengine/table/DataNodeTableCache.java  |  10 +-
 .../schemaengine/table/DataNodeTableCacheTest.java |  58 ++++++++++
 33 files changed, 509 insertions(+), 102 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 15991cfc90e..a99d7dd18c7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -47,18 +47,10 @@ public class PipeTaskCoordinatorLock {
     LOGGER.debug(
         ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD,
         Thread.currentThread().getName());
-    try {
-      semaphore.acquire();
-      LOGGER.debug(
-          ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
-          Thread.currentThread().getName());
-    } catch (final InterruptedException e) {
-      Thread.currentThread().interrupt();
-      PipeLogger.log(
-          LOGGER::error,
-          
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
-          Thread.currentThread().getName());
-    }
+    semaphore.acquireUninterruptibly();
+    LOGGER.debug(
+        ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
+        Thread.currentThread().getName());
   }
 
   public boolean tryLock() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 29a241b16d0..f3e88b708d4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -62,6 +63,7 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   private volatile long lastUpdate;
 
   private final AtomicReference<byte[]> result = new AtomicReference<>();
+  private final AtomicBoolean executing = new AtomicBoolean(false);
   private volatile boolean locked = false;
   private boolean lockedWhenLoading = false;
 
@@ -235,6 +237,16 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
     // no op
   }
 
+  /**
+   * Called after an execution attempt returns {@link 
ProcedureLockState#LOCK_EVENT_WAIT}. Override
+   * it to put the procedure into the corresponding lock wait queue.
+   *
+   * @param env env
+   */
+  protected void waitForLock(Env env) {
+    // no op
+  }
+
   /**
    * Used to keep procedure lock even when the procedure is yielded or 
suspended.
    *
@@ -256,6 +268,14 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   }
 
   // -------------------------Internal methods - called by the 
procedureExecutor------------------
+  final boolean tryAcquireExecution() {
+    return executing.compareAndSet(false, true);
+  }
+
+  final void releaseExecution() {
+    executing.set(false);
+  }
+
   /**
    * Internal method called by the ProcedureExecutor that starts the 
user-level code execute().
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index b0e5d737835..f3652dab1d0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -42,7 +41,6 @@ import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -86,6 +84,16 @@ public class ProcedureExecutor<Env> {
   private final Env environment;
   private final IProcedureStore<Env> store;
 
+  private static final class LockStateResult<Env> {
+    private final ProcedureLockState lockState;
+    private final Procedure<Env> procedure;
+
+    private LockStateResult(ProcedureLockState lockState, Procedure<Env> 
procedure) {
+      this.lockState = lockState;
+      this.procedure = procedure;
+    }
+  }
+
   public ProcedureExecutor(
       final Env environment, final IProcedureStore<Env> store, final 
ProcedureScheduler scheduler) {
     this.environment = environment;
@@ -329,33 +337,39 @@ public class ProcedureExecutor<Env> {
       return;
     }
     ProcedureLockState lockState = null;
+    Procedure<Env> lockEventWaitProcedure = null;
     try {
       do {
         if (!rootProcStack.acquire()) {
           if (rootProcStack.setRollback()) {
-            lockState = executeRootStackRollback(rootProcId, rootProcStack);
+            LockStateResult<Env> lockStateResult =
+                executeRootStackRollback(rootProcId, rootProcStack);
+            lockState = lockStateResult.lockState;
             switch (lockState) {
               case LOCK_ACQUIRED:
                 break;
               case LOCK_EVENT_WAIT:
-                LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc);
+                LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, 
lockStateResult.procedure);
                 rootProcStack.unsetRollback();
+                lockEventWaitProcedure = lockStateResult.procedure;
                 break;
               case LOCK_YIELD_WAIT:
                 rootProcStack.unsetRollback();
-                scheduler.yield(proc);
+                scheduler.yield(lockStateResult.procedure);
                 break;
               default:
                 throw new UnsupportedOperationException();
             }
           } else {
             if (!proc.wasExecuted()) {
-              switch (executeRollback(proc)) {
+              lockState = executeRollback(proc);
+              switch (lockState) {
                 case LOCK_ACQUIRED:
                   break;
                 case LOCK_EVENT_WAIT:
                   LOG.info(
                       
ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc);
+                  lockEventWaitProcedure = proc;
                   break;
                 case LOCK_YIELD_WAIT:
                   scheduler.yield(proc);
@@ -367,19 +381,25 @@ public class ProcedureExecutor<Env> {
           }
           break;
         }
-        lockState = acquireLock(proc);
-        switch (lockState) {
-          case LOCK_ACQUIRED:
-            executeProcedure(rootProcStack, proc);
-            break;
-          case LOCK_YIELD_WAIT:
-          case LOCK_EVENT_WAIT:
-            LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState);
-            break;
-          default:
-            throw new UnsupportedOperationException();
+        try {
+          lockState = acquireLock(proc);
+          switch (lockState) {
+            case LOCK_ACQUIRED:
+              executeProcedure(rootProcStack, proc);
+              break;
+            case LOCK_YIELD_WAIT:
+            case LOCK_EVENT_WAIT:
+              LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState);
+              if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) {
+                lockEventWaitProcedure = proc;
+              }
+              break;
+            default:
+              throw new UnsupportedOperationException();
+          }
+        } finally {
+          rootProcStack.release();
         }
-        rootProcStack.release();
 
         if (proc.isSuccess()) {
           // update metrics on finishing the procedure
@@ -397,9 +417,9 @@ public class ProcedureExecutor<Env> {
     } finally {
       // Only after procedure has completed execution can it be allowed to be 
rescheduled to prevent
       // data races
-      if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
-        LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, 
proc.getProcId());
-        ((ConfigNodeProcedureEnv) 
this.environment).getNodeLock().waitProcedure(proc);
+      if (lockEventWaitProcedure != null) {
+        LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, 
lockEventWaitProcedure.getProcId());
+        lockEventWaitProcedure.waitForLock(this.environment);
       }
     }
   }
@@ -414,6 +434,7 @@ public class ProcedureExecutor<Env> {
     if (proc.getState() != ProcedureState.RUNNABLE) {
       LOG.error(
           "The executing procedure should in RUNNABLE state, but it's not. 
Procedure is {}", proc);
+      releaseLock(proc, false);
       return;
     }
     boolean reExecute;
@@ -615,8 +636,8 @@ public class ProcedureExecutor<Env> {
    * @param procedureStack root procedure stack
    * @return lock state
    */
-  private ProcedureLockState executeRootStackRollback(
-      Long rootProcId, RootProcedureStack procedureStack) {
+  private LockStateResult<Env> executeRootStackRollback(
+      Long rootProcId, RootProcedureStack<Env> procedureStack) {
     Procedure<Env> rootProcedure = procedures.get(rootProcId);
     ProcedureException exception = rootProcedure.getException();
     if (exception == null) {
@@ -637,7 +658,7 @@ public class ProcedureExecutor<Env> {
       }
       ProcedureLockState lockState = acquireLock(procedure);
       if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
-        return lockState;
+        return new LockStateResult<>(lockState, procedure);
       }
       lockState = executeRollback(procedure);
       releaseLock(procedure, false);
@@ -645,11 +666,11 @@ public class ProcedureExecutor<Env> {
       boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
       abortRollback |= !isRunning() || !store.isRunning();
       if (abortRollback) {
-        return lockState;
+        return new LockStateResult<>(lockState, procedure);
       }
 
       if (!procedure.isFinished() && 
procedure.isYieldAfterExecution(this.environment)) {
-        return ProcedureLockState.LOCK_YIELD_WAIT;
+        return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT, 
procedure);
       }
 
       if (procedure != rootProcedure) {
@@ -660,7 +681,7 @@ public class ProcedureExecutor<Env> {
     LOG.info(
         ProcedureMessages.ROLLED_BACK_TIME_DURATION_IS, rootProcedure, 
rootProcedure.elapsedTime());
     rootProcedureCleanup(rootProcedure);
-    return ProcedureLockState.LOCK_ACQUIRED;
+    return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED, 
rootProcedure);
   }
 
   private ProcedureLockState acquireLock(Procedure<Env> proc) {
@@ -795,21 +816,38 @@ public class ProcedureExecutor<Env> {
             Thread.sleep(1000);
             continue;
           }
-          this.activeProcedure.set(procedure);
-          activeExecutorCount.incrementAndGet();
-          startTime.set(System.currentTimeMillis());
-          PROCEDURE_EXECUTION_CONTEXT.set(true);
+          boolean executionAcquired = false;
+          while (isRunning() && !(executionAcquired = 
procedure.tryAcquireExecution())) {
+            Thread.sleep(10);
+          }
+          if (!executionAcquired) {
+            continue;
+          }
           try {
-            executeProcedure(procedure);
-          } finally {
-            PROCEDURE_EXECUTION_CONTEXT.remove();
+            this.activeProcedure.set(procedure);
+            activeExecutorCount.incrementAndGet();
+            startTime.set(System.currentTimeMillis());
+            try {
+              PROCEDURE_EXECUTION_CONTEXT.set(true);
+              try {
+                executeProcedure(procedure);
+              } finally {
+                PROCEDURE_EXECUTION_CONTEXT.remove();
+              }
+            } finally {
+              procedure.releaseExecution();
+              activeExecutorCount.decrementAndGet();
+              LOG.trace(
+                  "Halt pid={}, activeCount={}", procedure.getProcId(), 
activeExecutorCount.get());
+              this.activeProcedure.set(null);
+              lastUpdated = System.currentTimeMillis();
+              startTime.set(lastUpdated);
+            }
+          } catch (Exception e) {
+            LOG.warn(
+                "Exception happened when worker {} execute procedure {}", 
getName(), procedure, e);
+            throw e;
           }
-          activeExecutorCount.decrementAndGet();
-          LOG.trace(
-              "Halt pid={}, activeCount={}", procedure.getProcId(), 
activeExecutorCount.get());
-          this.activeProcedure.set(null);
-          lastUpdated = System.currentTimeMillis();
-          startTime.set(lastUpdated);
         }
 
       } catch (Exception e) {
@@ -820,6 +858,7 @@ public class ProcedureExecutor<Env> {
               this.activeProcedure.get(),
               e);
         }
+        this.activeProcedure.set(null);
       } finally {
         LOG.info(ProcedureMessages.PROCEDURE_WORKER_TERMINATED, getName());
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 3e62cdc1d4f..cfd7bf023b0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -43,11 +43,13 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
   }
 
   public void add(Procedure<Env> procedure) {
-    queue.add(new ProcedureDelayContainer<>(procedure));
+    ProcedureDelayContainer<Env> delayTask = new 
ProcedureDelayContainer<>(procedure);
+    queue.remove(delayTask);
+    queue.add(delayTask);
   }
 
   public boolean remove(Procedure<Env> procedure) {
-    return queue.remove(new ProcedureDelayContainer<>(procedure));
+    return queue.remove(new ProcedureDelayContainer<>(procedure)) || 
procedure.isFinished();
   }
 
   private ProcedureDelayContainer<Env> takeQuietly() {
@@ -68,10 +70,15 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
       }
       Procedure<Env> procedure = delayTask.getProcedure();
       if (procedure instanceof InternalProcedure) {
+        if (procedure.isFinished()) {
+          continue;
+        }
         InternalProcedure internal = (InternalProcedure) procedure;
         internal.periodicExecute(executor.getEnvironment());
-        procedure.updateTimestamp();
-        queue.add(delayTask);
+        if (!procedure.isFinished()) {
+          procedure.updateTimestamp();
+          queue.add(delayTask);
+        }
       } else {
         if (procedure.setTimeoutFailure(executor.getEnvironment())) {
           long rootProcId = executor.getRootProcedureId(procedure);
@@ -104,6 +111,23 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
       return procedure;
     }
 
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof ProcedureDelayContainer)) {
+        return false;
+      }
+      ProcedureDelayContainer<?> that = (ProcedureDelayContainer<?>) o;
+      return procedure == that.procedure;
+    }
+
+    @Override
+    public int hashCode() {
+      return System.identityHashCode(procedure);
+    }
+
     @Override
     public long getDelay(TimeUnit unit) {
       long delay = procedure.getTimeoutTimestamp() - 
System.currentTimeMillis();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index c3c9167e6ef..f7275a152d7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -195,7 +195,8 @@ public abstract class StateMachineProcedure<Env, TState> 
extends Procedure<Env>
             nextState);
       }
     }
-    if (getStateId(getCurrentState()) == stateToBeAdded) {
+    final TState currentState = getCurrentState();
+    if (currentState != null && getStateId(currentState) == stateToBeAdded) {
       cycles++;
     } else {
       cycles = 0;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index 490f723d2e6..240e84f6316 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -299,7 +299,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     }
     CreateCQProcedure that = (CreateCQProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && isGeneratedByPipe == that.isGeneratedByPipe
         && firstExecutionTime == that.firstExecutionTime
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index b1410279173..6cade537f1f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -56,6 +56,18 @@ public abstract class AbstractNodeProcedure<TState>
     }
   }
 
+  @Override
+  protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      configNodeProcedureEnv
+          .getNodeLock()
+          .waitProcedure(this, configNodeProcedureEnv.getScheduler());
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+
   @Override
   protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
     configNodeProcedureEnv.getSchedulerLock().lock();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 0e72737f265..24a38b156fe 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -145,12 +145,7 @@ public abstract class AbstractOperatePipeProcedureV2
           LOGGER.debug(
               
ProcedureMessages.PROCEDUREID_LOCK_EVENT_WAIT_PIPE_LOCK_WILL_BE_RELEASED,
               getProcId());
-          configNodeProcedureEnv
-              .getConfigManager()
-              .getPipeManager()
-              .getPipeTaskCoordinator()
-              .unlock();
-          pipeTaskInfo = null;
+          releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
         }
         break;
       default:
@@ -164,12 +159,7 @@ public abstract class AbstractOperatePipeProcedureV2
               
ProcedureMessages.PROCEDUREID_INVALID_LOCK_STATE_PIPE_LOCK_WILL_BE_RELEASED,
               getProcId(),
               procedureLockState);
-          configNodeProcedureEnv
-              .getConfigManager()
-              .getPipeManager()
-              .getPipeTaskCoordinator()
-              .unlock();
-          pipeTaskInfo = null;
+          releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
         }
         break;
     }
@@ -195,11 +185,16 @@ public abstract class AbstractOperatePipeProcedureV2
       }
       PipeProcedureMetrics.getInstance()
           .updateTimer(this.getOperation().getName(), this.elapsedTime());
-      
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
-      pipeTaskInfo = null;
+      releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
     }
   }
 
+  private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv 
configNodeProcedureEnv) {
+    // Clear before releasing the semaphore to avoid clobbering a re-scheduled 
execution's marker.
+    pipeTaskInfo = null;
+    
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+  }
+
   protected abstract PipeTaskOperation getOperation();
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index ddc658bdf4d..b1583c74c6b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -327,7 +327,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
     if (that instanceof CreatePipePluginProcedure) {
       CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) 
that;
       return thatProcedure.getProcId() == getProcId()
-          && thatProcedure.getCurrentState().equals(getCurrentState())
+          && Objects.equals(thatProcedure.getCurrentState(), getCurrentState())
           && thatProcedure.getCycles() == getCycles()
           && thatProcedure.pipePluginMeta.equals(pipePluginMeta);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 771ab6230bf..ff1caad7ad4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -296,7 +296,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     if (that instanceof DropPipePluginProcedure) {
       final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) 
that;
       return thatProcedure.getProcId() == getProcId()
-          && thatProcedure.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProcedure.getCurrentState(), 
this.getCurrentState())
           && thatProcedure.getCycles() == this.getCycles()
           && (thatProcedure.pluginName).equals(pluginName);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index a8399b281b4..4f4feb58017 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -203,7 +203,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     }
     PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && this.regionGroupToOldAndNewLeaderPairMap.equals(
             that.regionGroupToOldAndNewLeaderPairMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 08468f3d04d..a4bbbe81019 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -205,7 +205,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
     }
     PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && needWriteConsensusOnConfigNodes == 
that.needWriteConsensusOnConfigNodes
         && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 4ba69067851..530ca4c1956 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -169,7 +169,7 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && pipeName.equals(that.pipeName);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 251a4f0e3af..7c9bff96f8f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -188,7 +188,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && pipeName.equals(that.pipeName);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index e2ec41f3b83..4410be01d3f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -191,7 +191,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && isStoppedByRuntimeExceptionBeforeStop == 
that.isStoppedByRuntimeExceptionBeforeStop
         && pipeName.equals(that.pipeName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
index 1ad60f6f852..f168f6dd683 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
@@ -311,7 +311,7 @@ public class AlterEncodingCompressorProcedure
     }
     final AlterEncodingCompressorProcedure that = 
(AlterEncodingCompressorProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == getCycles()
         && Objects.equals(this.queryId, that.queryId)
         && this.isGeneratedByPipe == that.isGeneratedByPipe
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
index 26ea988f98e..3a4431047c2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
@@ -407,7 +407,7 @@ public class AlterTimeSeriesDataTypeProcedure
     }
     final AlterTimeSeriesDataTypeProcedure that = 
(AlterTimeSeriesDataTypeProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == getCycles()
         && this.isGeneratedByPipe == that.isGeneratedByPipe
         && this.measurementPath.equals(that.measurementPath)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 0b3eca82dce..7b8de782173 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -315,7 +315,7 @@ public class DeleteDatabaseProcedure
     if (that instanceof DeleteDatabaseProcedure) {
       final DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && 
thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
index 4f63e96840c..5846c79b521 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
@@ -313,7 +313,7 @@ public class DeleteLogicalViewProcedure
     }
     final DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == that.getCycles()
         && isGeneratedByPipe == that.isGeneratedByPipe
         && patternTree.equals(that.patternTree);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
index 0b5e45b5ca1..1cd5efe639c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
@@ -392,7 +392,7 @@ public class DeleteTimeSeriesProcedure
     }
     final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o;
     return this.getProcId() == that.getProcId()
-        && this.getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(this.getCurrentState(), that.getCurrentState())
         && this.getCycles() == getCycles()
         && this.isGeneratedByPipe == that.isGeneratedByPipe
         && this.patternTree.equals(that.patternTree);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
index 09414b0e3a8..f02b19b1618 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
@@ -266,7 +266,7 @@ public class SubscriptionHandleLeaderChangeProcedure 
extends AbstractOperateSubs
     final SubscriptionHandleLeaderChangeProcedure that =
         (SubscriptionHandleLeaderChangeProcedure) o;
     return getProcId() == that.getProcId()
-        && getCurrentState().equals(that.getCurrentState())
+        && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
         && runtimeVersion == that.runtimeVersion
         && 
regionGroupToOldAndNewLeaderPairMap.equals(that.regionGroupToOldAndNewLeaderPairMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
index 8406eee97b3..dd297bb4d22 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
@@ -312,7 +312,7 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
     if (that instanceof CreateTriggerProcedure) {
       CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && thatProc.triggerInformation.equals(this.triggerInformation);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
index 126197cbfe6..a3f2c51baa2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
@@ -180,7 +180,7 @@ public class DropTriggerProcedure extends 
AbstractNodeProcedure<DropTriggerState
     if (that instanceof DropTriggerProcedure) {
       DropTriggerProcedure thatProc = (DropTriggerProcedure) that;
       return thatProc.getProcId() == this.getProcId()
-          && thatProc.getCurrentState().equals(this.getCurrentState())
+          && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
           && thatProc.getCycles() == this.getCycles()
           && thatProc.isGeneratedByPipe == this.isGeneratedByPipe
           && (thatProc.triggerName).equals(this.triggerName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
index 832e339c0ae..e2f5935a909 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
@@ -45,7 +45,15 @@ public class LockQueue {
     return true;
   }
 
-  public void waitProcedure(Procedure<?> procedure) {
+  public void waitProcedure(Procedure<?> procedure, ProcedureScheduler 
procedureScheduler) {
+    if (lockOwnerProcedure == null) {
+      procedureScheduler.addFront(procedure);
+      return;
+    }
+    if (deque.stream()
+        .anyMatch(waitingProcedure -> waitingProcedure.getProcId() == 
procedure.getProcId())) {
+      return;
+    }
     deque.addLast(procedure);
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
index 3cd5ceacf4b..94b6f311930 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.scheduler;
 import org.apache.iotdb.confignode.procedure.Procedure;
 
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.locks.ReentrantLock;
 
 /** Simple scheduler for procedures */
 public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
@@ -48,6 +49,7 @@ public class SimpleProcedureScheduler extends 
AbstractProcedureScheduler {
     schedLock();
     try {
       runnables.clear();
+      waitings.clear();
     } finally {
       schedUnlock();
     }
@@ -68,12 +70,37 @@ public class SimpleProcedureScheduler extends 
AbstractProcedureScheduler {
     return runnables.size();
   }
 
-  public void addWaiting(Procedure proc) {
-    waitings.add(proc);
+  public void waitProcedure(Procedure proc, ReentrantLock lock) {
+    boolean signal = false;
+    schedLock();
+    try {
+      if (lock.isLocked()) {
+        waitings.add(proc);
+      } else {
+        runnables.addFirst(proc);
+        signal = true;
+      }
+    } finally {
+      schedUnlock();
+    }
+    if (signal) {
+      signalAll();
+    }
   }
 
-  public void releaseWaiting() {
-    runnables.addAll(waitings);
-    waitings.clear();
+  public void releaseWaiting(ReentrantLock lock) {
+    boolean signal;
+    schedLock();
+    try {
+      lock.unlock();
+      signal = !waitings.isEmpty();
+      runnables.addAll(waitings);
+      waitings.clear();
+    } finally {
+      schedUnlock();
+    }
+    if (signal) {
+      signalAll();
+    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
new file mode 100644
index 00000000000..74d5d821d76
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTaskCoordinatorLockTest {
+
+  @Test
+  public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws 
Exception {
+    PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+    lock.lock();
+
+    CountDownLatch waiting = new CountDownLatch(1);
+    AtomicBoolean acquired = new AtomicBoolean(false);
+    Thread thread =
+        new Thread(
+            () -> {
+              Thread.currentThread().interrupt();
+              waiting.countDown();
+              lock.lock();
+              acquired.set(true);
+              lock.unlock();
+            });
+    thread.start();
+
+    Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS));
+    TimeUnit.MILLISECONDS.sleep(200);
+    Assert.assertFalse(acquired.get());
+
+    lock.unlock();
+    thread.join(TimeUnit.SECONDS.toMillis(3));
+
+    Assert.assertFalse(thread.isAlive());
+    Assert.assertTrue(acquired.get());
+    Assert.assertFalse(lock.isLocked());
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
index 500a51e9e3d..967611ca8ee 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.confignode.procedure;
 
+import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
 import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
 
 import org.junit.Assert;
@@ -43,4 +46,24 @@ public class TestLockRegime extends TestProcedureBase {
         this.procExecutor, 
procIdList.stream().mapToLong(Long::longValue).toArray());
     Assert.assertEquals(env.lockAcquireSeq.toString(), 
env.executeSeq.toString());
   }
+
+  @Test
+  public void testLockQueueDoesNotWakeDuplicateProcedure() {
+    LockQueue lockQueue = new LockQueue();
+    SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler();
+    scheduler.start();
+
+    NoopProcedure lockOwner = new NoopProcedure();
+    lockOwner.setProcId(0);
+    Assert.assertTrue(lockQueue.tryLock(lockOwner));
+
+    NoopProcedure procedure = new NoopProcedure();
+    procedure.setProcId(1);
+    lockQueue.waitProcedure(procedure, scheduler);
+    lockQueue.waitProcedure(procedure, scheduler);
+
+    Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler));
+    Assert.assertEquals(1, scheduler.size());
+    scheduler.stop();
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index dce7f2ba5dc..ba5f635507a 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -23,11 +23,14 @@ import 
org.apache.iotdb.confignode.procedure.entity.IncProcedure;
 import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
 import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
 import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
 import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -96,6 +99,37 @@ public class TestProcedureExecutor extends TestProcedureBase 
{
     ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2);
   }
 
+  @Test
+  public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws 
InterruptedException {
+    BlockingProcedure blockingProcedure = new BlockingProcedure();
+    long procId = procExecutor.submitProcedure(blockingProcedure);
+
+    Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS));
+
+    procExecutor.getScheduler().addFront(blockingProcedure);
+    boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS);
+
+    blockingProcedure.releaseExecutions(duplicated ? 2 : 1);
+    ProcedureTestUtil.waitForProcedure(procExecutor, procId);
+
+    Assert.assertFalse(duplicated);
+    Assert.assertEquals(1, blockingProcedure.getExecutionCount());
+  }
+
+  @Test
+  public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws 
InterruptedException {
+    CompletingInternalProcedure internalProcedure = new 
CompletingInternalProcedure();
+
+    procExecutor.addInternalProcedure(internalProcedure);
+    procExecutor.addInternalProcedure(internalProcedure);
+
+    Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+    Assert.assertFalse(internalProcedure.awaitExecution(300, 
TimeUnit.MILLISECONDS));
+    Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+    Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+  }
+
   private int waitThreadCount(final int expectedThreads) {
     long startTime = System.currentTimeMillis();
     while (procExecutor.isRunning()
@@ -107,4 +141,62 @@ public class TestProcedureExecutor extends 
TestProcedureBase {
     }
     return procExecutor.getWorkerThreadCount();
   }
+
+  private static class BlockingProcedure extends Procedure<TestProcEnv> {
+
+    private final Semaphore entered = new Semaphore(0);
+    private final Semaphore finish = new Semaphore(0);
+    private final AtomicInteger executionCount = new AtomicInteger();
+
+    @Override
+    protected Procedure<TestProcEnv>[] execute(TestProcEnv env) throws 
InterruptedException {
+      executionCount.incrementAndGet();
+      entered.release();
+      finish.acquire();
+      return null;
+    }
+
+    @Override
+    protected void rollback(TestProcEnv env)
+        throws IOException, InterruptedException, ProcedureException {
+      // No state to roll back.
+    }
+
+    private boolean awaitExecution(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return entered.tryAcquire(timeout, unit);
+    }
+
+    private void releaseExecutions(int permits) {
+      finish.release(permits);
+    }
+
+    private int getExecutionCount() {
+      return executionCount.get();
+    }
+  }
+
+  private static class CompletingInternalProcedure extends 
InternalProcedure<TestProcEnv> {
+
+    private final Semaphore entered = new Semaphore(0);
+    private final AtomicInteger executionCount = new AtomicInteger();
+
+    private CompletingInternalProcedure() {
+      super(0);
+    }
+
+    @Override
+    protected void periodicExecute(TestProcEnv env) {
+      executionCount.incrementAndGet();
+      entered.release();
+      setState(ProcedureState.SUCCESS);
+    }
+
+    private boolean awaitExecution(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return entered.tryAcquire(timeout, unit);
+    }
+
+    private int getExecutionCount() {
+      return executionCount.get();
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
index ce9fea39d55..42badd70079 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
@@ -53,18 +53,21 @@ public class SimpleLockProcedure extends 
Procedure<TestProcEnv> {
 
       return ProcedureLockState.LOCK_ACQUIRED;
     }
-    SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) 
testProcEnv.getScheduler();
-    scheduler.addWaiting(this);
     System.out.println(procName + " wait for lock.");
     return ProcedureLockState.LOCK_EVENT_WAIT;
   }
 
+  @Override
+  protected void waitForLock(TestProcEnv testProcEnv) {
+    SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) 
testProcEnv.getScheduler();
+    scheduler.waitProcedure(this, testProcEnv.getEnvLock());
+  }
+
   @Override
   protected void releaseLock(TestProcEnv testProcEnv) {
     System.out.println(procName + " release lock.");
-    testProcEnv.getEnvLock().unlock();
     SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) 
testProcEnv.getScheduler();
-    scheduler.releaseWaiting();
+    scheduler.releaseWaiting(testProcEnv.getEnvLock());
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index 75c0963a27f..b2ec615fbcd 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -106,4 +106,51 @@ public class PipeHandleLeaderChangeProcedureTest {
       fail();
     }
   }
+
+  @Test
+  public void completedProcedureEqualsTest() {
+    Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+    leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), 
new Pair<>(1, 2));
+
+    try {
+      PipeHandleLeaderChangeProcedure proc = 
deserializeCompletedProcedure(leaderMap);
+      PipeHandleLeaderChangeProcedure proc2 = 
deserializeCompletedProcedure(leaderMap);
+
+      assertEquals(proc, proc2);
+      assertEquals(proc.hashCode(), proc2.hashCode());
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure(
+      Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) throws 
Exception {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+    outputStream.writeLong(Procedure.NO_PROC_ID);
+    outputStream.writeInt(ProcedureState.SUCCESS.ordinal());
+    outputStream.writeLong(0L);
+    outputStream.writeLong(0L);
+    outputStream.writeLong(Procedure.NO_PROC_ID);
+    outputStream.writeLong(Procedure.NO_TIMEOUT);
+    outputStream.writeInt(-1);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(-1);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(1);
+    outputStream.writeInt(Integer.MIN_VALUE);
+    outputStream.write((byte) 0);
+    outputStream.writeInt(leaderMap.size());
+    for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry : 
leaderMap.entrySet()) {
+      outputStream.writeInt(entry.getKey().getId());
+      outputStream.writeInt(entry.getValue().getLeft());
+      outputStream.writeInt(entry.getValue().getRight());
+    }
+
+    ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    return (PipeHandleLeaderChangeProcedure) 
ProcedureFactory.getInstance().create(buffer);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
index f94eb0a2f99..bc0d740bc15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
@@ -282,9 +282,13 @@ public class ErrorHandler extends BaseErrorListener {
       // The ATN can be in multiple states (similar to an NFA)
       Deque<ParsingState> activeStates = new ArrayDeque<>();
       activeStates.add(start);
+      Set<ParsingState> processedStates = new HashSet<>();
 
       while (!activeStates.isEmpty()) {
         ParsingState current = activeStates.pop();
+        if (!processedStates.add(current)) {
+          continue;
+        }
 
         ATNState state = current.state;
         int tokenIndex = current.tokenIndex;
@@ -328,7 +332,7 @@ public class ErrorHandler extends BaseErrorListener {
                   new ParsingState(
                       ruleTransition.followState,
                       endToken,
-                      suppressed && endToken == currentToken,
+                      suppressed && endToken == tokenIndex,
                       parser));
             }
           } else if (transition instanceof PrecedencePredicateTransition) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
index f545a9bda23..d8d8f4fe19b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
@@ -374,8 +374,10 @@ public class DataNodeTableCache implements ITableCache {
   private Map<String, Map<String, TsTable>> getTablesInConfigNode(
       final Map<String, Map<String, Long>> tableInput) {
     Map<String, Map<String, TsTable>> result = Collections.emptyMap();
+    boolean acquired = false;
     try {
       fetchTableSemaphore.acquire();
+      acquired = true;
       final TFetchTableResp resp =
           ClusterConfigTaskExecutor.getInstance()
               .fetchTables(
@@ -388,11 +390,11 @@ public class DataNodeTableCache implements ITableCache {
     } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       
LOGGER.warn(DataNodeSchemaMessages.INTERRUPTED_ACQUIRE_SEMAPHORE_GET_TABLES);
-    } catch (final Exception e) {
-      fetchTableSemaphore.release();
-      throw e;
+    } finally {
+      if (acquired) {
+        fetchTableSemaphore.release();
+      }
     }
-    fetchTableSemaphore.release();
     return result;
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
new file mode 100644
index 00000000000..2d6114363a5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.table;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.Semaphore;
+
+public class DataNodeTableCacheTest {
+
+  private static final String DATABASE = "interrupted_fetch_database";
+
+  @Test
+  public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception {
+    final DataNodeTableCache cache = DataNodeTableCache.getInstance();
+    cache.invalid(DATABASE);
+    try {
+      final Semaphore fetchTableSemaphore = getFetchTableSemaphore(cache);
+      final int permitsBeforeFetch = fetchTableSemaphore.availablePermits();
+
+      Thread.currentThread().interrupt();
+      try {
+        Assert.assertFalse(cache.isDatabaseExist(DATABASE));
+      } finally {
+        Thread.interrupted();
+      }
+
+      Assert.assertEquals(permitsBeforeFetch, 
fetchTableSemaphore.availablePermits());
+    } finally {
+      cache.invalid(DATABASE);
+    }
+  }
+
+  private Semaphore getFetchTableSemaphore(final DataNodeTableCache cache) 
throws Exception {
+    final Field field = 
DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore");
+    field.setAccessible(true);
+    return (Semaphore) field.get(cache);
+  }
+}

Reply via email to