This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0a45a3b7b70 Fix duplicate scheduling in procedure execution (#17902)
0a45a3b7b70 is described below
commit 0a45a3b7b708c3b436e6b9b4c6aa9b65bede7076
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 18:18:59 2026 +0800
Fix duplicate scheduling in procedure execution (#17902)
* Fix duplicate scheduling in procedure execution
* Fix delayed procedure deduplication and semaphore release
* Fix SQL parser error handler traversal
* Fix pipe procedure lock release race
* Fix procedure lock wait scheduling
(cherry picked from commit c25849a093b466966353ecd7b63f722535b8d5b3)
---
.../coordinator/task/PipeTaskCoordinatorLock.java | 16 +--
.../iotdb/confignode/procedure/Procedure.java | 20 ++++
.../confignode/procedure/ProcedureExecutor.java | 119 ++++++++++++++-------
.../procedure/TimeoutExecutorThread.java | 32 +++++-
.../procedure/impl/StateMachineProcedure.java | 3 +-
.../procedure/impl/cq/CreateCQProcedure.java | 2 +-
.../procedure/impl/node/AbstractNodeProcedure.java | 12 +++
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 23 ++--
.../pipe/plugin/CreatePipePluginProcedure.java | 2 +-
.../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../runtime/PipeHandleLeaderChangeProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 2 +-
.../impl/pipe/task/DropPipeProcedureV2.java | 2 +-
.../impl/pipe/task/StartPipeProcedureV2.java | 2 +-
.../impl/pipe/task/StopPipeProcedureV2.java | 2 +-
.../schema/AlterEncodingCompressorProcedure.java | 2 +-
.../schema/AlterTimeSeriesDataTypeProcedure.java | 2 +-
.../impl/schema/DeleteDatabaseProcedure.java | 2 +-
.../impl/schema/DeleteLogicalViewProcedure.java | 2 +-
.../impl/schema/DeleteTimeSeriesProcedure.java | 2 +-
.../SubscriptionHandleLeaderChangeProcedure.java | 2 +-
.../impl/trigger/CreateTriggerProcedure.java | 2 +-
.../impl/trigger/DropTriggerProcedure.java | 2 +-
.../confignode/procedure/scheduler/LockQueue.java | 10 +-
.../scheduler/SimpleProcedureScheduler.java | 37 ++++++-
.../task/PipeTaskCoordinatorLockTest.java | 60 +++++++++++
.../iotdb/confignode/procedure/TestLockRegime.java | 23 ++++
.../procedure/TestProcedureExecutor.java | 92 ++++++++++++++++
.../procedure/entity/SimpleLockProcedure.java | 11 +-
.../PipeHandleLeaderChangeProcedureTest.java | 47 ++++++++
.../plan/relational/sql/parser/ErrorHandler.java | 6 +-
.../db/schemaengine/table/DataNodeTableCache.java | 10 +-
.../schemaengine/table/DataNodeTableCacheTest.java | 58 ++++++++++
33 files changed, 509 insertions(+), 102 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 15991cfc90e..a99d7dd18c7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -47,18 +47,10 @@ public class PipeTaskCoordinatorLock {
LOGGER.debug(
ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD,
Thread.currentThread().getName());
- try {
- semaphore.acquire();
- LOGGER.debug(
- ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
- Thread.currentThread().getName());
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- PipeLogger.log(
- LOGGER::error,
-
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
- Thread.currentThread().getName());
- }
+ semaphore.acquireUninterruptibly();
+ LOGGER.debug(
+ ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
+ Thread.currentThread().getName());
}
public boolean tryLock() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 29a241b16d0..f3e88b708d4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -62,6 +63,7 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
private volatile long lastUpdate;
private final AtomicReference<byte[]> result = new AtomicReference<>();
+ private final AtomicBoolean executing = new AtomicBoolean(false);
private volatile boolean locked = false;
private boolean lockedWhenLoading = false;
@@ -235,6 +237,16 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
// no op
}
+ /**
+ * Called after an execution attempt returns {@link
ProcedureLockState#LOCK_EVENT_WAIT}. Override
+ * it to put the procedure into the corresponding lock wait queue.
+ *
+ * @param env env
+ */
+ protected void waitForLock(Env env) {
+ // no op
+ }
+
/**
* Used to keep procedure lock even when the procedure is yielded or
suspended.
*
@@ -256,6 +268,14 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
}
// -------------------------Internal methods - called by the
procedureExecutor------------------
+ final boolean tryAcquireExecution() {
+ return executing.compareAndSet(false, true);
+ }
+
+ final void releaseExecution() {
+ executing.set(false);
+ }
+
/**
* Internal method called by the ProcedureExecutor that starts the
user-level code execute().
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index b0e5d737835..f3652dab1d0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -42,7 +41,6 @@ import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -86,6 +84,16 @@ public class ProcedureExecutor<Env> {
private final Env environment;
private final IProcedureStore<Env> store;
+ private static final class LockStateResult<Env> {
+ private final ProcedureLockState lockState;
+ private final Procedure<Env> procedure;
+
+ private LockStateResult(ProcedureLockState lockState, Procedure<Env>
procedure) {
+ this.lockState = lockState;
+ this.procedure = procedure;
+ }
+ }
+
public ProcedureExecutor(
final Env environment, final IProcedureStore<Env> store, final
ProcedureScheduler scheduler) {
this.environment = environment;
@@ -329,33 +337,39 @@ public class ProcedureExecutor<Env> {
return;
}
ProcedureLockState lockState = null;
+ Procedure<Env> lockEventWaitProcedure = null;
try {
do {
if (!rootProcStack.acquire()) {
if (rootProcStack.setRollback()) {
- lockState = executeRootStackRollback(rootProcId, rootProcStack);
+ LockStateResult<Env> lockStateResult =
+ executeRootStackRollback(rootProcId, rootProcStack);
+ lockState = lockStateResult.lockState;
switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
- LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc);
+ LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK,
lockStateResult.procedure);
rootProcStack.unsetRollback();
+ lockEventWaitProcedure = lockStateResult.procedure;
break;
case LOCK_YIELD_WAIT:
rootProcStack.unsetRollback();
- scheduler.yield(proc);
+ scheduler.yield(lockStateResult.procedure);
break;
default:
throw new UnsupportedOperationException();
}
} else {
if (!proc.wasExecuted()) {
- switch (executeRollback(proc)) {
+ lockState = executeRollback(proc);
+ switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info(
ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc);
+ lockEventWaitProcedure = proc;
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
@@ -367,19 +381,25 @@ public class ProcedureExecutor<Env> {
}
break;
}
- lockState = acquireLock(proc);
- switch (lockState) {
- case LOCK_ACQUIRED:
- executeProcedure(rootProcStack, proc);
- break;
- case LOCK_YIELD_WAIT:
- case LOCK_EVENT_WAIT:
- LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState);
- break;
- default:
- throw new UnsupportedOperationException();
+ try {
+ lockState = acquireLock(proc);
+ switch (lockState) {
+ case LOCK_ACQUIRED:
+ executeProcedure(rootProcStack, proc);
+ break;
+ case LOCK_YIELD_WAIT:
+ case LOCK_EVENT_WAIT:
+ LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState);
+ if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) {
+ lockEventWaitProcedure = proc;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } finally {
+ rootProcStack.release();
}
- rootProcStack.release();
if (proc.isSuccess()) {
// update metrics on finishing the procedure
@@ -397,9 +417,9 @@ public class ProcedureExecutor<Env> {
} finally {
// Only after procedure has completed execution can it be allowed to be
rescheduled to prevent
// data races
- if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
- LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK,
proc.getProcId());
- ((ConfigNodeProcedureEnv)
this.environment).getNodeLock().waitProcedure(proc);
+ if (lockEventWaitProcedure != null) {
+ LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK,
lockEventWaitProcedure.getProcId());
+ lockEventWaitProcedure.waitForLock(this.environment);
}
}
}
@@ -414,6 +434,7 @@ public class ProcedureExecutor<Env> {
if (proc.getState() != ProcedureState.RUNNABLE) {
LOG.error(
"The executing procedure should in RUNNABLE state, but it's not.
Procedure is {}", proc);
+ releaseLock(proc, false);
return;
}
boolean reExecute;
@@ -615,8 +636,8 @@ public class ProcedureExecutor<Env> {
* @param procedureStack root procedure stack
* @return lock state
*/
- private ProcedureLockState executeRootStackRollback(
- Long rootProcId, RootProcedureStack procedureStack) {
+ private LockStateResult<Env> executeRootStackRollback(
+ Long rootProcId, RootProcedureStack<Env> procedureStack) {
Procedure<Env> rootProcedure = procedures.get(rootProcId);
ProcedureException exception = rootProcedure.getException();
if (exception == null) {
@@ -637,7 +658,7 @@ public class ProcedureExecutor<Env> {
}
ProcedureLockState lockState = acquireLock(procedure);
if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
- return lockState;
+ return new LockStateResult<>(lockState, procedure);
}
lockState = executeRollback(procedure);
releaseLock(procedure, false);
@@ -645,11 +666,11 @@ public class ProcedureExecutor<Env> {
boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
if (abortRollback) {
- return lockState;
+ return new LockStateResult<>(lockState, procedure);
}
if (!procedure.isFinished() &&
procedure.isYieldAfterExecution(this.environment)) {
- return ProcedureLockState.LOCK_YIELD_WAIT;
+ return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT,
procedure);
}
if (procedure != rootProcedure) {
@@ -660,7 +681,7 @@ public class ProcedureExecutor<Env> {
LOG.info(
ProcedureMessages.ROLLED_BACK_TIME_DURATION_IS, rootProcedure,
rootProcedure.elapsedTime());
rootProcedureCleanup(rootProcedure);
- return ProcedureLockState.LOCK_ACQUIRED;
+ return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED,
rootProcedure);
}
private ProcedureLockState acquireLock(Procedure<Env> proc) {
@@ -795,21 +816,38 @@ public class ProcedureExecutor<Env> {
Thread.sleep(1000);
continue;
}
- this.activeProcedure.set(procedure);
- activeExecutorCount.incrementAndGet();
- startTime.set(System.currentTimeMillis());
- PROCEDURE_EXECUTION_CONTEXT.set(true);
+ boolean executionAcquired = false;
+ while (isRunning() && !(executionAcquired =
procedure.tryAcquireExecution())) {
+ Thread.sleep(10);
+ }
+ if (!executionAcquired) {
+ continue;
+ }
try {
- executeProcedure(procedure);
- } finally {
- PROCEDURE_EXECUTION_CONTEXT.remove();
+ this.activeProcedure.set(procedure);
+ activeExecutorCount.incrementAndGet();
+ startTime.set(System.currentTimeMillis());
+ try {
+ PROCEDURE_EXECUTION_CONTEXT.set(true);
+ try {
+ executeProcedure(procedure);
+ } finally {
+ PROCEDURE_EXECUTION_CONTEXT.remove();
+ }
+ } finally {
+ procedure.releaseExecution();
+ activeExecutorCount.decrementAndGet();
+ LOG.trace(
+ "Halt pid={}, activeCount={}", procedure.getProcId(),
activeExecutorCount.get());
+ this.activeProcedure.set(null);
+ lastUpdated = System.currentTimeMillis();
+ startTime.set(lastUpdated);
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Exception happened when worker {} execute procedure {}",
getName(), procedure, e);
+ throw e;
}
- activeExecutorCount.decrementAndGet();
- LOG.trace(
- "Halt pid={}, activeCount={}", procedure.getProcId(),
activeExecutorCount.get());
- this.activeProcedure.set(null);
- lastUpdated = System.currentTimeMillis();
- startTime.set(lastUpdated);
}
} catch (Exception e) {
@@ -820,6 +858,7 @@ public class ProcedureExecutor<Env> {
this.activeProcedure.get(),
e);
}
+ this.activeProcedure.set(null);
} finally {
LOG.info(ProcedureMessages.PROCEDURE_WORKER_TERMINATED, getName());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 3e62cdc1d4f..cfd7bf023b0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -43,11 +43,13 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
}
public void add(Procedure<Env> procedure) {
- queue.add(new ProcedureDelayContainer<>(procedure));
+ ProcedureDelayContainer<Env> delayTask = new
ProcedureDelayContainer<>(procedure);
+ queue.remove(delayTask);
+ queue.add(delayTask);
}
public boolean remove(Procedure<Env> procedure) {
- return queue.remove(new ProcedureDelayContainer<>(procedure));
+ return queue.remove(new ProcedureDelayContainer<>(procedure)) ||
procedure.isFinished();
}
private ProcedureDelayContainer<Env> takeQuietly() {
@@ -68,10 +70,15 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
}
Procedure<Env> procedure = delayTask.getProcedure();
if (procedure instanceof InternalProcedure) {
+ if (procedure.isFinished()) {
+ continue;
+ }
InternalProcedure internal = (InternalProcedure) procedure;
internal.periodicExecute(executor.getEnvironment());
- procedure.updateTimestamp();
- queue.add(delayTask);
+ if (!procedure.isFinished()) {
+ procedure.updateTimestamp();
+ queue.add(delayTask);
+ }
} else {
if (procedure.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(procedure);
@@ -104,6 +111,23 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
return procedure;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ProcedureDelayContainer)) {
+ return false;
+ }
+ ProcedureDelayContainer<?> that = (ProcedureDelayContainer<?>) o;
+ return procedure == that.procedure;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(procedure);
+ }
+
@Override
public long getDelay(TimeUnit unit) {
long delay = procedure.getTimeoutTimestamp() -
System.currentTimeMillis();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index c3c9167e6ef..f7275a152d7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -195,7 +195,8 @@ public abstract class StateMachineProcedure<Env, TState>
extends Procedure<Env>
nextState);
}
}
- if (getStateId(getCurrentState()) == stateToBeAdded) {
+ final TState currentState = getCurrentState();
+ if (currentState != null && getStateId(currentState) == stateToBeAdded) {
cycles++;
} else {
cycles = 0;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index 490f723d2e6..240e84f6316 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -299,7 +299,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
}
CreateCQProcedure that = (CreateCQProcedure) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& isGeneratedByPipe == that.isGeneratedByPipe
&& firstExecutionTime == that.firstExecutionTime
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index b1410279173..6cade537f1f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -56,6 +56,18 @@ public abstract class AbstractNodeProcedure<TState>
}
}
+ @Override
+ protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ configNodeProcedureEnv
+ .getNodeLock()
+ .waitProcedure(this, configNodeProcedureEnv.getScheduler());
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+
@Override
protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
configNodeProcedureEnv.getSchedulerLock().lock();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 0e72737f265..24a38b156fe 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -145,12 +145,7 @@ public abstract class AbstractOperatePipeProcedureV2
LOGGER.debug(
ProcedureMessages.PROCEDUREID_LOCK_EVENT_WAIT_PIPE_LOCK_WILL_BE_RELEASED,
getProcId());
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .unlock();
- pipeTaskInfo = null;
+ releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
}
break;
default:
@@ -164,12 +159,7 @@ public abstract class AbstractOperatePipeProcedureV2
ProcedureMessages.PROCEDUREID_INVALID_LOCK_STATE_PIPE_LOCK_WILL_BE_RELEASED,
getProcId(),
procedureLockState);
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .unlock();
- pipeTaskInfo = null;
+ releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
}
break;
}
@@ -195,11 +185,16 @@ public abstract class AbstractOperatePipeProcedureV2
}
PipeProcedureMetrics.getInstance()
.updateTimer(this.getOperation().getName(), this.elapsedTime());
-
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
- pipeTaskInfo = null;
+ releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
}
}
+ private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv
configNodeProcedureEnv) {
+ // Clear before releasing the semaphore to avoid clobbering a re-scheduled
execution's marker.
+ pipeTaskInfo = null;
+
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ }
+
protected abstract PipeTaskOperation getOperation();
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index ddc658bdf4d..b1583c74c6b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -327,7 +327,7 @@ public class CreatePipePluginProcedure extends
AbstractNodeProcedure<CreatePipeP
if (that instanceof CreatePipePluginProcedure) {
CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure)
that;
return thatProcedure.getProcId() == getProcId()
- && thatProcedure.getCurrentState().equals(getCurrentState())
+ && Objects.equals(thatProcedure.getCurrentState(), getCurrentState())
&& thatProcedure.getCycles() == getCycles()
&& thatProcedure.pipePluginMeta.equals(pipePluginMeta);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 771ab6230bf..ff1caad7ad4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -296,7 +296,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
if (that instanceof DropPipePluginProcedure) {
final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure)
that;
return thatProcedure.getProcId() == getProcId()
- && thatProcedure.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProcedure.getCurrentState(),
this.getCurrentState())
&& thatProcedure.getCycles() == this.getCycles()
&& (thatProcedure.pluginName).equals(pluginName);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index a8399b281b4..4f4feb58017 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -203,7 +203,7 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
}
PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& this.regionGroupToOldAndNewLeaderPairMap.equals(
that.regionGroupToOldAndNewLeaderPairMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 08468f3d04d..a4bbbe81019 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -205,7 +205,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& needWriteConsensusOnConfigNodes ==
that.needWriteConsensusOnConfigNodes
&& needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 4ba69067851..530ca4c1956 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -169,7 +169,7 @@ public class DropPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& pipeName.equals(that.pipeName);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 251a4f0e3af..7c9bff96f8f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -188,7 +188,7 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& pipeName.equals(that.pipeName);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index e2ec41f3b83..4410be01d3f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -191,7 +191,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& isStoppedByRuntimeExceptionBeforeStop ==
that.isStoppedByRuntimeExceptionBeforeStop
&& pipeName.equals(that.pipeName);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
index 1ad60f6f852..f168f6dd683 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java
@@ -311,7 +311,7 @@ public class AlterEncodingCompressorProcedure
}
final AlterEncodingCompressorProcedure that =
(AlterEncodingCompressorProcedure) o;
return this.getProcId() == that.getProcId()
- && this.getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == getCycles()
&& Objects.equals(this.queryId, that.queryId)
&& this.isGeneratedByPipe == that.isGeneratedByPipe
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
index 26ea988f98e..3a4431047c2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java
@@ -407,7 +407,7 @@ public class AlterTimeSeriesDataTypeProcedure
}
final AlterTimeSeriesDataTypeProcedure that =
(AlterTimeSeriesDataTypeProcedure) o;
return this.getProcId() == that.getProcId()
- && this.getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == getCycles()
&& this.isGeneratedByPipe == that.isGeneratedByPipe
&& this.measurementPath.equals(that.measurementPath)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 0b3eca82dce..7b8de782173 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -315,7 +315,7 @@ public class DeleteDatabaseProcedure
if (that instanceof DeleteDatabaseProcedure) {
final DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that;
return thatProc.getProcId() == this.getProcId()
- && thatProc.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
&& thatProc.getCycles() == this.getCycles()
&& thatProc.isGeneratedByPipe == this.isGeneratedByPipe
&&
thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
index 4f63e96840c..5846c79b521 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
@@ -313,7 +313,7 @@ public class DeleteLogicalViewProcedure
}
final DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o;
return this.getProcId() == that.getProcId()
- && this.getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == that.getCycles()
&& isGeneratedByPipe == that.isGeneratedByPipe
&& patternTree.equals(that.patternTree);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
index 0b5e45b5ca1..1cd5efe639c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
@@ -392,7 +392,7 @@ public class DeleteTimeSeriesProcedure
}
final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o;
return this.getProcId() == that.getProcId()
- && this.getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == getCycles()
&& this.isGeneratedByPipe == that.isGeneratedByPipe
&& this.patternTree.equals(that.patternTree);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
index 09414b0e3a8..f02b19b1618 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
@@ -266,7 +266,7 @@ public class SubscriptionHandleLeaderChangeProcedure
extends AbstractOperateSubs
final SubscriptionHandleLeaderChangeProcedure that =
(SubscriptionHandleLeaderChangeProcedure) o;
return getProcId() == that.getProcId()
- && getCurrentState().equals(that.getCurrentState())
+ && Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& runtimeVersion == that.runtimeVersion
&&
regionGroupToOldAndNewLeaderPairMap.equals(that.regionGroupToOldAndNewLeaderPairMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
index 8406eee97b3..dd297bb4d22 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
@@ -312,7 +312,7 @@ public class CreateTriggerProcedure extends
AbstractNodeProcedure<CreateTriggerS
if (that instanceof CreateTriggerProcedure) {
CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that;
return thatProc.getProcId() == this.getProcId()
- && thatProc.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
&& thatProc.getCycles() == this.getCycles()
&& thatProc.isGeneratedByPipe == this.isGeneratedByPipe
&& thatProc.triggerInformation.equals(this.triggerInformation);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
index 126197cbfe6..a3f2c51baa2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java
@@ -180,7 +180,7 @@ public class DropTriggerProcedure extends
AbstractNodeProcedure<DropTriggerState
if (that instanceof DropTriggerProcedure) {
DropTriggerProcedure thatProc = (DropTriggerProcedure) that;
return thatProc.getProcId() == this.getProcId()
- && thatProc.getCurrentState().equals(this.getCurrentState())
+ && Objects.equals(thatProc.getCurrentState(), this.getCurrentState())
&& thatProc.getCycles() == this.getCycles()
&& thatProc.isGeneratedByPipe == this.isGeneratedByPipe
&& (thatProc.triggerName).equals(this.triggerName);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
index 832e339c0ae..e2f5935a909 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
@@ -45,7 +45,15 @@ public class LockQueue {
return true;
}
- public void waitProcedure(Procedure<?> procedure) {
+ public void waitProcedure(Procedure<?> procedure, ProcedureScheduler
procedureScheduler) {
+ if (lockOwnerProcedure == null) {
+ procedureScheduler.addFront(procedure);
+ return;
+ }
+ if (deque.stream()
+ .anyMatch(waitingProcedure -> waitingProcedure.getProcId() ==
procedure.getProcId())) {
+ return;
+ }
deque.addLast(procedure);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
index 3cd5ceacf4b..94b6f311930 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.scheduler;
import org.apache.iotdb.confignode.procedure.Procedure;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.locks.ReentrantLock;
/** Simple scheduler for procedures */
public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
@@ -48,6 +49,7 @@ public class SimpleProcedureScheduler extends
AbstractProcedureScheduler {
schedLock();
try {
runnables.clear();
+ waitings.clear();
} finally {
schedUnlock();
}
@@ -68,12 +70,37 @@ public class SimpleProcedureScheduler extends
AbstractProcedureScheduler {
return runnables.size();
}
- public void addWaiting(Procedure proc) {
- waitings.add(proc);
+ public void waitProcedure(Procedure proc, ReentrantLock lock) {
+ boolean signal = false;
+ schedLock();
+ try {
+ if (lock.isLocked()) {
+ waitings.add(proc);
+ } else {
+ runnables.addFirst(proc);
+ signal = true;
+ }
+ } finally {
+ schedUnlock();
+ }
+ if (signal) {
+ signalAll();
+ }
}
- public void releaseWaiting() {
- runnables.addAll(waitings);
- waitings.clear();
+ public void releaseWaiting(ReentrantLock lock) {
+ boolean signal;
+ schedLock();
+ try {
+ lock.unlock();
+ signal = !waitings.isEmpty();
+ runnables.addAll(waitings);
+ waitings.clear();
+ } finally {
+ schedUnlock();
+ }
+ if (signal) {
+ signalAll();
+ }
}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
new file mode 100644
index 00000000000..74d5d821d76
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTaskCoordinatorLockTest {
+
+ @Test
+ public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws
Exception {
+ PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+ lock.lock();
+
+ CountDownLatch waiting = new CountDownLatch(1);
+ AtomicBoolean acquired = new AtomicBoolean(false);
+ Thread thread =
+ new Thread(
+ () -> {
+ Thread.currentThread().interrupt();
+ waiting.countDown();
+ lock.lock();
+ acquired.set(true);
+ lock.unlock();
+ });
+ thread.start();
+
+ Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS));
+ TimeUnit.MILLISECONDS.sleep(200);
+ Assert.assertFalse(acquired.get());
+
+ lock.unlock();
+ thread.join(TimeUnit.SECONDS.toMillis(3));
+
+ Assert.assertFalse(thread.isAlive());
+ Assert.assertTrue(acquired.get());
+ Assert.assertFalse(lock.isLocked());
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
index 500a51e9e3d..967611ca8ee 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.confignode.procedure;
+import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
@@ -43,4 +46,24 @@ public class TestLockRegime extends TestProcedureBase {
this.procExecutor,
procIdList.stream().mapToLong(Long::longValue).toArray());
Assert.assertEquals(env.lockAcquireSeq.toString(),
env.executeSeq.toString());
}
+
+ @Test
+ public void testLockQueueDoesNotWakeDuplicateProcedure() {
+ LockQueue lockQueue = new LockQueue();
+ SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler();
+ scheduler.start();
+
+ NoopProcedure lockOwner = new NoopProcedure();
+ lockOwner.setProcId(0);
+ Assert.assertTrue(lockQueue.tryLock(lockOwner));
+
+ NoopProcedure procedure = new NoopProcedure();
+ procedure.setProcId(1);
+ lockQueue.waitProcedure(procedure, scheduler);
+ lockQueue.waitProcedure(procedure, scheduler);
+
+ Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler));
+ Assert.assertEquals(1, scheduler.size());
+ scheduler.stop();
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index dce7f2ba5dc..ba5f635507a 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -23,11 +23,14 @@ import
org.apache.iotdb.confignode.procedure.entity.IncProcedure;
import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -96,6 +99,37 @@ public class TestProcedureExecutor extends TestProcedureBase
{
ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2);
}
+ @Test
+ public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws
InterruptedException {
+ BlockingProcedure blockingProcedure = new BlockingProcedure();
+ long procId = procExecutor.submitProcedure(blockingProcedure);
+
+ Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS));
+
+ procExecutor.getScheduler().addFront(blockingProcedure);
+ boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS);
+
+ blockingProcedure.releaseExecutions(duplicated ? 2 : 1);
+ ProcedureTestUtil.waitForProcedure(procExecutor, procId);
+
+ Assert.assertFalse(duplicated);
+ Assert.assertEquals(1, blockingProcedure.getExecutionCount());
+ }
+
+ @Test
+ public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws
InterruptedException {
+ CompletingInternalProcedure internalProcedure = new
CompletingInternalProcedure();
+
+ procExecutor.addInternalProcedure(internalProcedure);
+ procExecutor.addInternalProcedure(internalProcedure);
+
+ Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+ Assert.assertFalse(internalProcedure.awaitExecution(300,
TimeUnit.MILLISECONDS));
+ Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+ Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+ }
+
private int waitThreadCount(final int expectedThreads) {
long startTime = System.currentTimeMillis();
while (procExecutor.isRunning()
@@ -107,4 +141,62 @@ public class TestProcedureExecutor extends
TestProcedureBase {
}
return procExecutor.getWorkerThreadCount();
}
+
+ private static class BlockingProcedure extends Procedure<TestProcEnv> {
+
+ private final Semaphore entered = new Semaphore(0);
+ private final Semaphore finish = new Semaphore(0);
+ private final AtomicInteger executionCount = new AtomicInteger();
+
+ @Override
+ protected Procedure<TestProcEnv>[] execute(TestProcEnv env) throws
InterruptedException {
+ executionCount.incrementAndGet();
+ entered.release();
+ finish.acquire();
+ return null;
+ }
+
+ @Override
+ protected void rollback(TestProcEnv env)
+ throws IOException, InterruptedException, ProcedureException {
+ // No state to roll back.
+ }
+
+ private boolean awaitExecution(long timeout, TimeUnit unit) throws
InterruptedException {
+ return entered.tryAcquire(timeout, unit);
+ }
+
+ private void releaseExecutions(int permits) {
+ finish.release(permits);
+ }
+
+ private int getExecutionCount() {
+ return executionCount.get();
+ }
+ }
+
+ private static class CompletingInternalProcedure extends
InternalProcedure<TestProcEnv> {
+
+ private final Semaphore entered = new Semaphore(0);
+ private final AtomicInteger executionCount = new AtomicInteger();
+
+ private CompletingInternalProcedure() {
+ super(0);
+ }
+
+ @Override
+ protected void periodicExecute(TestProcEnv env) {
+ executionCount.incrementAndGet();
+ entered.release();
+ setState(ProcedureState.SUCCESS);
+ }
+
+ private boolean awaitExecution(long timeout, TimeUnit unit) throws
InterruptedException {
+ return entered.tryAcquire(timeout, unit);
+ }
+
+ private int getExecutionCount() {
+ return executionCount.get();
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
index ce9fea39d55..42badd70079 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
@@ -53,18 +53,21 @@ public class SimpleLockProcedure extends
Procedure<TestProcEnv> {
return ProcedureLockState.LOCK_ACQUIRED;
}
- SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler)
testProcEnv.getScheduler();
- scheduler.addWaiting(this);
System.out.println(procName + " wait for lock.");
return ProcedureLockState.LOCK_EVENT_WAIT;
}
+ @Override
+ protected void waitForLock(TestProcEnv testProcEnv) {
+ SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler)
testProcEnv.getScheduler();
+ scheduler.waitProcedure(this, testProcEnv.getEnvLock());
+ }
+
@Override
protected void releaseLock(TestProcEnv testProcEnv) {
System.out.println(procName + " release lock.");
- testProcEnv.getEnvLock().unlock();
SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler)
testProcEnv.getScheduler();
- scheduler.releaseWaiting();
+ scheduler.releaseWaiting(testProcEnv.getEnvLock());
}
@Override
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index 75c0963a27f..b2ec615fbcd 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -106,4 +106,51 @@ public class PipeHandleLeaderChangeProcedureTest {
fail();
}
}
+
+ @Test
+ public void completedProcedureEqualsTest() {
+ Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+ leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
new Pair<>(1, 2));
+
+ try {
+ PipeHandleLeaderChangeProcedure proc =
deserializeCompletedProcedure(leaderMap);
+ PipeHandleLeaderChangeProcedure proc2 =
deserializeCompletedProcedure(leaderMap);
+
+ assertEquals(proc, proc2);
+ assertEquals(proc.hashCode(), proc2.hashCode());
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure(
+ Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) throws
Exception {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeInt(ProcedureState.SUCCESS.ordinal());
+ outputStream.writeLong(0L);
+ outputStream.writeLong(0L);
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeLong(Procedure.NO_TIMEOUT);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(1);
+ outputStream.writeInt(Integer.MIN_VALUE);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(leaderMap.size());
+ for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry :
leaderMap.entrySet()) {
+ outputStream.writeInt(entry.getKey().getId());
+ outputStream.writeInt(entry.getValue().getLeft());
+ outputStream.writeInt(entry.getValue().getRight());
+ }
+
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ return (PipeHandleLeaderChangeProcedure)
ProcedureFactory.getInstance().create(buffer);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
index f94eb0a2f99..bc0d740bc15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/ErrorHandler.java
@@ -282,9 +282,13 @@ public class ErrorHandler extends BaseErrorListener {
// The ATN can be in multiple states (similar to an NFA)
Deque<ParsingState> activeStates = new ArrayDeque<>();
activeStates.add(start);
+ Set<ParsingState> processedStates = new HashSet<>();
while (!activeStates.isEmpty()) {
ParsingState current = activeStates.pop();
+ if (!processedStates.add(current)) {
+ continue;
+ }
ATNState state = current.state;
int tokenIndex = current.tokenIndex;
@@ -328,7 +332,7 @@ public class ErrorHandler extends BaseErrorListener {
new ParsingState(
ruleTransition.followState,
endToken,
- suppressed && endToken == currentToken,
+ suppressed && endToken == tokenIndex,
parser));
}
} else if (transition instanceof PrecedencePredicateTransition) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
index f545a9bda23..d8d8f4fe19b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
@@ -374,8 +374,10 @@ public class DataNodeTableCache implements ITableCache {
private Map<String, Map<String, TsTable>> getTablesInConfigNode(
final Map<String, Map<String, Long>> tableInput) {
Map<String, Map<String, TsTable>> result = Collections.emptyMap();
+ boolean acquired = false;
try {
fetchTableSemaphore.acquire();
+ acquired = true;
final TFetchTableResp resp =
ClusterConfigTaskExecutor.getInstance()
.fetchTables(
@@ -388,11 +390,11 @@ public class DataNodeTableCache implements ITableCache {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(DataNodeSchemaMessages.INTERRUPTED_ACQUIRE_SEMAPHORE_GET_TABLES);
- } catch (final Exception e) {
- fetchTableSemaphore.release();
- throw e;
+ } finally {
+ if (acquired) {
+ fetchTableSemaphore.release();
+ }
}
- fetchTableSemaphore.release();
return result;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
new file mode 100644
index 00000000000..2d6114363a5
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.table;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.Semaphore;
+
+public class DataNodeTableCacheTest {
+
+ private static final String DATABASE = "interrupted_fetch_database";
+
+ @Test
+ public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception {
+ final DataNodeTableCache cache = DataNodeTableCache.getInstance();
+ cache.invalid(DATABASE);
+ try {
+ final Semaphore fetchTableSemaphore = getFetchTableSemaphore(cache);
+ final int permitsBeforeFetch = fetchTableSemaphore.availablePermits();
+
+ Thread.currentThread().interrupt();
+ try {
+ Assert.assertFalse(cache.isDatabaseExist(DATABASE));
+ } finally {
+ Thread.interrupted();
+ }
+
+ Assert.assertEquals(permitsBeforeFetch,
fetchTableSemaphore.availablePermits());
+ } finally {
+ cache.invalid(DATABASE);
+ }
+ }
+
+ private Semaphore getFetchTableSemaphore(final DataNodeTableCache cache)
throws Exception {
+ final Field field =
DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore");
+ field.setAccessible(true);
+ return (Semaphore) field.get(cache);
+ }
+}