This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c7b4204f746 Fix procedure framework data race (#14309)
c7b4204f746 is described below
commit c7b4204f746e6151accd0bcc1ea6593f1572471c
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Dec 4 17:17:31 2024 +0800
Fix procedure framework data race (#14309)
(cherry picked from commit 01f0a8fa30c891d598d8b7f1034df968964cfe92)
---
.../confignode/procedure/ProcedureExecutor.java | 109 ++++++++++++---------
.../procedure/impl/node/AbstractNodeProcedure.java | 5 +-
2 files changed, 64 insertions(+), 50 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 4cf768027c6..0643eb8a75c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
@@ -41,6 +42,7 @@ import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -329,68 +331,79 @@ public class ProcedureExecutor<Env> {
LOG.warn("Rollback stack is null for {}", proc.getProcId());
return;
}
- do {
- if (!rootProcStack.acquire()) {
- if (rootProcStack.setRollback()) {
- switch (executeRootStackRollback(rootProcId, rootProcStack)) {
- case LOCK_ACQUIRED:
- break;
- case LOCK_EVENT_WAIT:
- LOG.info("LOCK_EVENT_WAIT rollback " + proc);
- rootProcStack.unsetRollback();
- break;
- case LOCK_YIELD_WAIT:
- rootProcStack.unsetRollback();
- scheduler.yield(proc);
- break;
- default:
- throw new UnsupportedOperationException();
- }
- } else {
- if (!proc.wasExecuted()) {
- switch (executeRollback(proc)) {
+ ProcedureLockState lockState = null;
+ try {
+ do {
+ if (!rootProcStack.acquire()) {
+ if (rootProcStack.setRollback()) {
+ lockState = executeRootStackRollback(rootProcId, rootProcStack);
+ switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
- LOG.info("LOCK_EVENT_WAIT can't rollback child running for
{}", proc);
+ LOG.info("LOCK_EVENT_WAIT rollback {}", proc);
+ rootProcStack.unsetRollback();
break;
case LOCK_YIELD_WAIT:
+ rootProcStack.unsetRollback();
scheduler.yield(proc);
break;
default:
throw new UnsupportedOperationException();
}
+ } else {
+ if (!proc.wasExecuted()) {
+ switch (executeRollback(proc)) {
+ case LOCK_ACQUIRED:
+ break;
+ case LOCK_EVENT_WAIT:
+ LOG.info("LOCK_EVENT_WAIT can't rollback child running for
{}", proc);
+ break;
+ case LOCK_YIELD_WAIT:
+ scheduler.yield(proc);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
}
- }
- break;
- }
- ProcedureLockState lockState = acquireLock(proc);
- switch (lockState) {
- case LOCK_ACQUIRED:
- executeProcedure(rootProcStack, proc);
break;
- case LOCK_YIELD_WAIT:
- case LOCK_EVENT_WAIT:
- LOG.info("{} lockstate is {}", proc, lockState);
- break;
- default:
- throw new UnsupportedOperationException();
- }
- rootProcStack.release();
-
- if (proc.isSuccess()) {
- // update metrics on finishing the procedure
- proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
- LOG.debug("{} finished in {}ms successfully.", proc,
proc.elapsedTime());
- if (proc.getProcId() == rootProcId) {
- rootProcedureCleanup(proc);
- } else {
- executeCompletionCleanup(proc);
}
- return;
- }
+ lockState = acquireLock(proc);
+ switch (lockState) {
+ case LOCK_ACQUIRED:
+ executeProcedure(rootProcStack, proc);
+ break;
+ case LOCK_YIELD_WAIT:
+ case LOCK_EVENT_WAIT:
+ LOG.info("{} lockstate is {}", proc, lockState);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ rootProcStack.release();
+
+ if (proc.isSuccess()) {
+ // update metrics on finishing the procedure
+ proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(),
true);
+ LOG.debug("{} finished in {}ms successfully.", proc,
proc.elapsedTime());
+ if (proc.getProcId() == rootProcId) {
+ rootProcedureCleanup(proc);
+ } else {
+ executeCompletionCleanup(proc);
+ }
+ return;
+ }
- } while (rootProcStack.isFailed());
+ } while (rootProcStack.isFailed());
+ } finally {
+ // Only after procedure has completed execution can it be allowed to be
rescheduled to prevent
+ // data races
+ if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
+ LOG.info("procedureId {} wait for lock.", proc.getProcId());
+ ((ConfigNodeProcedureEnv)
this.environment).getNodeLock().waitProcedure(proc);
+ }
+ }
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index e2d29e1a1d2..b1410279173 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -47,8 +47,9 @@ public abstract class AbstractNodeProcedure<TState>
LOG.info("procedureId {} acquire lock.", getProcId());
return ProcedureLockState.LOCK_ACQUIRED;
}
- configNodeProcedureEnv.getNodeLock().waitProcedure(this);
- LOG.info("procedureId {} wait for lock.", getProcId());
+ LOG.info(
+ "procedureId {} acquire lock failed, will wait for lock after
finishing execution.",
+ getProcId());
return ProcedureLockState.LOCK_EVENT_WAIT;
} finally {
configNodeProcedureEnv.getSchedulerLock().unlock();