This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-procedure-duplicate-scheduling in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d66d51cdeda38934f98848c66893e93c119dec2b Author: Caideyipi <[email protected]> AuthorDate: Wed Jun 10 18:02:29 2026 +0800 Fix duplicate scheduling in procedure execution --- .../coordinator/task/PipeTaskCoordinatorLock.java | 15 +-- .../iotdb/confignode/procedure/Procedure.java | 10 ++ .../confignode/procedure/ProcedureExecutor.java | 107 +++++++++++---------- .../procedure/impl/StateMachineProcedure.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 2 +- .../procedure/impl/node/AbstractNodeProcedure.java | 1 + .../pipe/plugin/CreatePipePluginProcedure.java | 2 +- .../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +- .../runtime/PipeHandleLeaderChangeProcedure.java | 2 +- .../runtime/PipeHandleMetaChangeProcedure.java | 2 +- .../impl/pipe/task/DropPipeProcedureV2.java | 2 +- .../impl/pipe/task/StartPipeProcedureV2.java | 2 +- .../impl/pipe/task/StopPipeProcedureV2.java | 2 +- .../schema/AlterEncodingCompressorProcedure.java | 2 +- .../schema/AlterTimeSeriesDataTypeProcedure.java | 2 +- .../impl/schema/DeleteDatabaseProcedure.java | 2 +- .../impl/schema/DeleteLogicalViewProcedure.java | 2 +- .../impl/schema/DeleteTimeSeriesProcedure.java | 2 +- .../SubscriptionHandleLeaderChangeProcedure.java | 2 +- .../impl/trigger/CreateTriggerProcedure.java | 2 +- .../impl/trigger/DropTriggerProcedure.java | 2 +- .../confignode/procedure/scheduler/LockQueue.java | 4 + .../task/PipeTaskCoordinatorLockTest.java | 60 ++++++++++++ .../iotdb/confignode/procedure/TestLockRegime.java | 19 ++++ .../procedure/TestProcedureExecutor.java | 52 ++++++++++ .../PipeHandleLeaderChangeProcedureTest.java | 47 +++++++++ 26 files changed, 269 insertions(+), 81 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index 0f788435394..3a6fe1f698a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -46,17 +46,10 @@ public class PipeTaskCoordinatorLock { LOGGER.debug( ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD, Thread.currentThread().getName()); - try { - semaphore.acquire(); - LOGGER.debug( - ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, - Thread.currentThread().getName()); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.error( - ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, - Thread.currentThread().getName()); - } + semaphore.acquireUninterruptibly(); + LOGGER.debug( + ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, + Thread.currentThread().getName()); } public boolean tryLock() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 29a241b16d0..24a8adc713f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -62,6 +63,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> { private volatile long lastUpdate; private final AtomicReference<byte[]> result = new AtomicReference<>(); + private final AtomicBoolean executing = new AtomicBoolean(false); private volatile boolean locked = false; private boolean lockedWhenLoading = false; @@ -256,6 +258,14 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> { } // -------------------------Internal methods - called by the procedureExecutor------------------ + final boolean tryAcquireExecution() { + return executing.compareAndSet(false, true); + } + + final void releaseExecution() { + executing.set(false); + } + /** * Internal method called by the ProcedureExecutor that starts the user-level code execute(). * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 82afea3859f..16a9c72626d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.i18n.ProcedureMessages; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; @@ -42,7 +41,6 @@ import java.util.Arrays; import java.util.Deque; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -328,45 +326,44 @@ public class ProcedureExecutor<Env> { LOG.warn(ProcedureMessages.ROLLBACK_STACK_IS_NULL_FOR, proc.getProcId()); return; } - ProcedureLockState lockState = null; - try { - do { - if (!rootProcStack.acquire()) { - if (rootProcStack.setRollback()) { - lockState = executeRootStackRollback(rootProcId, rootProcStack); - switch (lockState) { + ProcedureLockState lockState; + do { + if (!rootProcStack.acquire()) { + if (rootProcStack.setRollback()) { + lockState = executeRootStackRollback(rootProcId, rootProcStack); + switch (lockState) { + case LOCK_ACQUIRED: + break; + case LOCK_EVENT_WAIT: + LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc); + rootProcStack.unsetRollback(); + break; + case LOCK_YIELD_WAIT: + rootProcStack.unsetRollback(); + scheduler.yield(proc); + break; + default: + throw new UnsupportedOperationException(); + } + } else { + if (!proc.wasExecuted()) { + switch (executeRollback(proc)) { case LOCK_ACQUIRED: break; case LOCK_EVENT_WAIT: - LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc); - rootProcStack.unsetRollback(); + LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc); break; case LOCK_YIELD_WAIT: - rootProcStack.unsetRollback(); scheduler.yield(proc); break; default: throw new UnsupportedOperationException(); } - } else { - if (!proc.wasExecuted()) { - switch (executeRollback(proc)) { - case LOCK_ACQUIRED: - break; - case LOCK_EVENT_WAIT: - LOG.info( - ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc); - break; - case LOCK_YIELD_WAIT: - scheduler.yield(proc); - break; - default: - throw new UnsupportedOperationException(); - } - } } - break; } + break; + } + try { lockState = acquireLock(proc); switch (lockState) { case LOCK_ACQUIRED: @@ -379,29 +376,23 @@ public class ProcedureExecutor<Env> { default: throw new UnsupportedOperationException(); } + } finally { rootProcStack.release(); + } - if (proc.isSuccess()) { - // update metrics on finishing the procedure - proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); - LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, proc.elapsedTime()); - if (proc.getProcId() == rootProcId) { - rootProcedureCleanup(proc); - } else { - executeCompletionCleanup(proc); - } - return; + if (proc.isSuccess()) { + // update metrics on finishing the procedure + proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); + LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, proc.elapsedTime()); + if (proc.getProcId() == rootProcId) { + rootProcedureCleanup(proc); + } else { + executeCompletionCleanup(proc); } - - } while (rootProcStack.isFailed()); - } finally { - // Only after procedure has completed execution can it be allowed to be rescheduled to prevent - // data races - if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) { - LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, proc.getProcId()); - ((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc); + return; } - } + + } while (rootProcStack.isFailed()); } /** @@ -414,6 +405,7 @@ public class ProcedureExecutor<Env> { if (proc.getState() != ProcedureState.RUNNABLE) { LOG.error( "The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc); + releaseLock(proc, false); return; } boolean reExecute; @@ -787,6 +779,13 @@ public class ProcedureExecutor<Env> { Thread.sleep(1000); continue; } + boolean executionAcquired = false; + while (isRunning() && !(executionAcquired = procedure.tryAcquireExecution())) { + Thread.sleep(10); + } + if (!executionAcquired) { + continue; + } this.activeProcedure.set(procedure); activeExecutorCount.incrementAndGet(); startTime.set(System.currentTimeMillis()); @@ -795,13 +794,14 @@ public class ProcedureExecutor<Env> { executeProcedure(procedure); } finally { PROCEDURE_EXECUTION_CONTEXT.remove(); + procedure.releaseExecution(); + activeExecutorCount.decrementAndGet(); + LOG.trace( + "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); + lastUpdated = System.currentTimeMillis(); + startTime.set(lastUpdated); } - activeExecutorCount.decrementAndGet(); - LOG.trace( - "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); this.activeProcedure.set(null); - lastUpdated = System.currentTimeMillis(); - startTime.set(lastUpdated); } } catch (Exception e) { @@ -812,6 +812,7 @@ public class ProcedureExecutor<Env> { this.activeProcedure.get(), e); } + this.activeProcedure.set(null); } finally { LOG.info(ProcedureMessages.PROCEDURE_WORKER_TERMINATED, getName()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java index c3c9167e6ef..f7275a152d7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java @@ -195,7 +195,8 @@ public abstract class StateMachineProcedure<Env, TState> extends Procedure<Env> nextState); } } - if (getStateId(getCurrentState()) == stateToBeAdded) { + final TState currentState = getCurrentState(); + if (currentState != null && getStateId(currentState) == stateToBeAdded) { cycles++; } else { cycles = 0; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index 490f723d2e6..240e84f6316 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -299,7 +299,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { } CreateCQProcedure that = (CreateCQProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && isGeneratedByPipe == that.isGeneratedByPipe && firstExecutionTime == that.firstExecutionTime diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java index b1410279173..9a088560a5b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java @@ -50,6 +50,7 @@ public abstract class AbstractNodeProcedure<TState> LOG.info( "procedureId {} acquire lock failed, will wait for lock after finishing execution.", getProcId()); + configNodeProcedureEnv.getNodeLock().waitProcedure(this); return ProcedureLockState.LOCK_EVENT_WAIT; } finally { configNodeProcedureEnv.getSchedulerLock().unlock(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index ddc658bdf4d..b1583c74c6b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -327,7 +327,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP if (that instanceof CreatePipePluginProcedure) { CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) that; return thatProcedure.getProcId() == getProcId() - && thatProcedure.getCurrentState().equals(getCurrentState()) + && Objects.equals(thatProcedure.getCurrentState(), getCurrentState()) && thatProcedure.getCycles() == getCycles() && thatProcedure.pipePluginMeta.equals(pipePluginMeta); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index 771ab6230bf..ff1caad7ad4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -296,7 +296,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi if (that instanceof DropPipePluginProcedure) { final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) that; return thatProcedure.getProcId() == getProcId() - && thatProcedure.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProcedure.getCurrentState(), this.getCurrentState()) && thatProcedure.getCycles() == this.getCycles() && (thatProcedure.pluginName).equals(pluginName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 2c5b9566fce..b2ef752854f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -185,7 +185,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur } PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && this.regionGroupToOldAndNewLeaderPairMap.equals( that.regionGroupToOldAndNewLeaderPairMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index a0dec367352..76bf432d454 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -190,7 +190,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV } PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && needWriteConsensusOnConfigNodes == that.needWriteConsensusOnConfigNodes && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java index 4ba69067851..530ca4c1956 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java @@ -169,7 +169,7 @@ public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } DropPipeProcedureV2 that = (DropPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && pipeName.equals(that.pipeName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index 251a4f0e3af..7c9bff96f8f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -188,7 +188,7 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } StartPipeProcedureV2 that = (StartPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && pipeName.equals(that.pipeName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java index e2ec41f3b83..4410be01d3f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java @@ -191,7 +191,7 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { } StopPipeProcedureV2 that = (StopPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && isStoppedByRuntimeExceptionBeforeStop == that.isStoppedByRuntimeExceptionBeforeStop && pipeName.equals(that.pipeName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java index 1ad60f6f852..f168f6dd683 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java @@ -311,7 +311,7 @@ public class AlterEncodingCompressorProcedure } final AlterEncodingCompressorProcedure that = (AlterEncodingCompressorProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == getCycles() && Objects.equals(this.queryId, that.queryId) && this.isGeneratedByPipe == that.isGeneratedByPipe diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java index 26ea988f98e..3a4431047c2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java @@ -407,7 +407,7 @@ public class AlterTimeSeriesDataTypeProcedure } final AlterTimeSeriesDataTypeProcedure that = (AlterTimeSeriesDataTypeProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == getCycles() && this.isGeneratedByPipe == that.isGeneratedByPipe && this.measurementPath.equals(that.measurementPath) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 0b3eca82dce..7b8de782173 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -315,7 +315,7 @@ public class DeleteDatabaseProcedure if (that instanceof DeleteDatabaseProcedure) { final DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java index 4f63e96840c..5846c79b521 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java @@ -313,7 +313,7 @@ public class DeleteLogicalViewProcedure } final DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == that.getCycles() && isGeneratedByPipe == that.isGeneratedByPipe && patternTree.equals(that.patternTree); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java index 0b5e45b5ca1..1cd5efe639c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java @@ -392,7 +392,7 @@ public class DeleteTimeSeriesProcedure } final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == getCycles() && this.isGeneratedByPipe == that.isGeneratedByPipe && this.patternTree.equals(that.patternTree); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java index 5337b719d20..222b66a944b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java @@ -265,7 +265,7 @@ public class SubscriptionHandleLeaderChangeProcedure extends AbstractOperateSubs final SubscriptionHandleLeaderChangeProcedure that = (SubscriptionHandleLeaderChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && runtimeVersion == that.runtimeVersion && regionGroupToOldAndNewLeaderPairMap.equals(that.regionGroupToOldAndNewLeaderPairMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java index 8406eee97b3..dd297bb4d22 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java @@ -312,7 +312,7 @@ public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerS if (that instanceof CreateTriggerProcedure) { CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && thatProc.triggerInformation.equals(this.triggerInformation); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java index 126197cbfe6..a3f2c51baa2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java @@ -180,7 +180,7 @@ public class DropTriggerProcedure extends AbstractNodeProcedure<DropTriggerState if (that instanceof DropTriggerProcedure) { DropTriggerProcedure thatProc = (DropTriggerProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && (thatProc.triggerName).equals(this.triggerName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java index 832e339c0ae..895c173dbc8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java @@ -46,6 +46,10 @@ public class LockQueue { } public void waitProcedure(Procedure<?> procedure) { + if (deque.stream() + .anyMatch(waitingProcedure -> waitingProcedure.getProcId() == procedure.getProcId())) { + return; + } deque.addLast(procedure); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java new file mode 100644 index 00000000000..74d5d821d76 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.coordinator.task; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PipeTaskCoordinatorLockTest { + + @Test + public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws Exception { + PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock(); + lock.lock(); + + CountDownLatch waiting = new CountDownLatch(1); + AtomicBoolean acquired = new AtomicBoolean(false); + Thread thread = + new Thread( + () -> { + Thread.currentThread().interrupt(); + waiting.countDown(); + lock.lock(); + acquired.set(true); + lock.unlock(); + }); + thread.start(); + + Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS)); + TimeUnit.MILLISECONDS.sleep(200); + Assert.assertFalse(acquired.get()); + + lock.unlock(); + thread.join(TimeUnit.SECONDS.toMillis(3)); + + Assert.assertFalse(thread.isAlive()); + Assert.assertTrue(acquired.get()); + Assert.assertFalse(lock.isLocked()); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java index 500a51e9e3d..85b0dfb8c9f 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java @@ -19,7 +19,10 @@ package org.apache.iotdb.confignode.procedure; +import org.apache.iotdb.confignode.procedure.entity.NoopProcedure; import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure; +import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; +import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil; import org.junit.Assert; @@ -43,4 +46,20 @@ public class TestLockRegime extends TestProcedureBase { this.procExecutor, procIdList.stream().mapToLong(Long::longValue).toArray()); Assert.assertEquals(env.lockAcquireSeq.toString(), env.executeSeq.toString()); } + + @Test + public void testLockQueueDoesNotWakeDuplicateProcedure() { + LockQueue lockQueue = new LockQueue(); + SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler(); + scheduler.start(); + + NoopProcedure procedure = new NoopProcedure(); + procedure.setProcId(1); + lockQueue.waitProcedure(procedure); + lockQueue.waitProcedure(procedure); + + Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler)); + Assert.assertEquals(1, scheduler.size()); + scheduler.stop(); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java index dce7f2ba5dc..0b4e716dc3b 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java @@ -23,11 +23,13 @@ import org.apache.iotdb.confignode.procedure.entity.IncProcedure; import org.apache.iotdb.confignode.procedure.entity.NoopProcedure; import org.apache.iotdb.confignode.procedure.entity.StuckProcedure; import org.apache.iotdb.confignode.procedure.env.TestProcEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -96,6 +98,23 @@ public class TestProcedureExecutor extends TestProcedureBase { ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2); } + @Test + public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws InterruptedException { + BlockingProcedure blockingProcedure = new BlockingProcedure(); + long procId = procExecutor.submitProcedure(blockingProcedure); + + Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS)); + + procExecutor.getScheduler().addFront(blockingProcedure); + boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS); + + blockingProcedure.releaseExecutions(duplicated ? 2 : 1); + ProcedureTestUtil.waitForProcedure(procExecutor, procId); + + Assert.assertFalse(duplicated); + Assert.assertEquals(1, blockingProcedure.getExecutionCount()); + } + private int waitThreadCount(final int expectedThreads) { long startTime = System.currentTimeMillis(); while (procExecutor.isRunning() @@ -107,4 +126,37 @@ public class TestProcedureExecutor extends TestProcedureBase { } return procExecutor.getWorkerThreadCount(); } + + private static class BlockingProcedure extends Procedure<TestProcEnv> { + + private final Semaphore entered = new Semaphore(0); + private final Semaphore finish = new Semaphore(0); + private final AtomicInteger executionCount = new AtomicInteger(); + + @Override + protected Procedure<TestProcEnv>[] execute(TestProcEnv env) throws InterruptedException { + executionCount.incrementAndGet(); + entered.release(); + finish.acquire(); + return null; + } + + @Override + protected void rollback(TestProcEnv env) + throws IOException, InterruptedException, ProcedureException { + // No state to roll back. + } + + private boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return entered.tryAcquire(timeout, unit); + } + + private void releaseExecutions(int permits) { + finish.release(permits); + } + + private int getExecutionCount() { + return executionCount.get(); + } + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java index 75c0963a27f..b2ec615fbcd 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java @@ -106,4 +106,51 @@ public class PipeHandleLeaderChangeProcedureTest { fail(); } } + + @Test + public void completedProcedureEqualsTest() { + Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>(); + leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), new Pair<>(1, 2)); + + try { + PipeHandleLeaderChangeProcedure proc = deserializeCompletedProcedure(leaderMap); + PipeHandleLeaderChangeProcedure proc2 = deserializeCompletedProcedure(leaderMap); + + assertEquals(proc, proc2); + assertEquals(proc.hashCode(), proc2.hashCode()); + } catch (Exception e) { + fail(); + } + } + + private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure( + Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) throws Exception { + PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode()); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeInt(ProcedureState.SUCCESS.ordinal()); + outputStream.writeLong(0L); + outputStream.writeLong(0L); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeLong(Procedure.NO_TIMEOUT); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(1); + outputStream.writeInt(Integer.MIN_VALUE); + outputStream.write((byte) 0); + outputStream.writeInt(leaderMap.size()); + for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry : leaderMap.entrySet()) { + outputStream.writeInt(entry.getKey().getId()); + outputStream.writeInt(entry.getValue().getLeft()); + outputStream.writeInt(entry.getValue().getRight()); + } + + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + return (PipeHandleLeaderChangeProcedure) ProcedureFactory.getInstance().create(buffer); + } }
