This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new c7b4204f746 Fix procedure framework data race (#14309)
c7b4204f746 is described below

commit c7b4204f746e6151accd0bcc1ea6593f1572471c
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Dec 4 17:17:31 2024 +0800

    Fix procedure framework data race (#14309)
    
    (cherry picked from commit 01f0a8fa30c891d598d8b7f1034df968964cfe92)
---
 .../confignode/procedure/ProcedureExecutor.java    | 109 ++++++++++++---------
 .../procedure/impl/node/AbstractNodeProcedure.java |   5 +-
 2 files changed, 64 insertions(+), 50 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 4cf768027c6..0643eb8a75c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
@@ -41,6 +42,7 @@ import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -329,68 +331,79 @@ public class ProcedureExecutor<Env> {
       LOG.warn("Rollback stack is null for {}", proc.getProcId());
       return;
     }
-    do {
-      if (!rootProcStack.acquire()) {
-        if (rootProcStack.setRollback()) {
-          switch (executeRootStackRollback(rootProcId, rootProcStack)) {
-            case LOCK_ACQUIRED:
-              break;
-            case LOCK_EVENT_WAIT:
-              LOG.info("LOCK_EVENT_WAIT rollback " + proc);
-              rootProcStack.unsetRollback();
-              break;
-            case LOCK_YIELD_WAIT:
-              rootProcStack.unsetRollback();
-              scheduler.yield(proc);
-              break;
-            default:
-              throw new UnsupportedOperationException();
-          }
-        } else {
-          if (!proc.wasExecuted()) {
-            switch (executeRollback(proc)) {
+    ProcedureLockState lockState = null;
+    try {
+      do {
+        if (!rootProcStack.acquire()) {
+          if (rootProcStack.setRollback()) {
+            lockState = executeRootStackRollback(rootProcId, rootProcStack);
+            switch (lockState) {
               case LOCK_ACQUIRED:
                 break;
               case LOCK_EVENT_WAIT:
-                LOG.info("LOCK_EVENT_WAIT can't rollback child running for 
{}", proc);
+                LOG.info("LOCK_EVENT_WAIT rollback {}", proc);
+                rootProcStack.unsetRollback();
                 break;
               case LOCK_YIELD_WAIT:
+                rootProcStack.unsetRollback();
                 scheduler.yield(proc);
                 break;
               default:
                 throw new UnsupportedOperationException();
             }
+          } else {
+            if (!proc.wasExecuted()) {
+              switch (executeRollback(proc)) {
+                case LOCK_ACQUIRED:
+                  break;
+                case LOCK_EVENT_WAIT:
+                  LOG.info("LOCK_EVENT_WAIT can't rollback child running for 
{}", proc);
+                  break;
+                case LOCK_YIELD_WAIT:
+                  scheduler.yield(proc);
+                  break;
+                default:
+                  throw new UnsupportedOperationException();
+              }
+            }
           }
-        }
-        break;
-      }
-      ProcedureLockState lockState = acquireLock(proc);
-      switch (lockState) {
-        case LOCK_ACQUIRED:
-          executeProcedure(rootProcStack, proc);
           break;
-        case LOCK_YIELD_WAIT:
-        case LOCK_EVENT_WAIT:
-          LOG.info("{} lockstate is {}", proc, lockState);
-          break;
-        default:
-          throw new UnsupportedOperationException();
-      }
-      rootProcStack.release();
-
-      if (proc.isSuccess()) {
-        // update metrics on finishing the procedure
-        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
-        LOG.debug("{} finished in {}ms successfully.", proc, 
proc.elapsedTime());
-        if (proc.getProcId() == rootProcId) {
-          rootProcedureCleanup(proc);
-        } else {
-          executeCompletionCleanup(proc);
         }
-        return;
-      }
+        lockState = acquireLock(proc);
+        switch (lockState) {
+          case LOCK_ACQUIRED:
+            executeProcedure(rootProcStack, proc);
+            break;
+          case LOCK_YIELD_WAIT:
+          case LOCK_EVENT_WAIT:
+            LOG.info("{} lockstate is {}", proc, lockState);
+            break;
+          default:
+            throw new UnsupportedOperationException();
+        }
+        rootProcStack.release();
+
+        if (proc.isSuccess()) {
+          // update metrics on finishing the procedure
+          proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), 
true);
+          LOG.debug("{} finished in {}ms successfully.", proc, 
proc.elapsedTime());
+          if (proc.getProcId() == rootProcId) {
+            rootProcedureCleanup(proc);
+          } else {
+            executeCompletionCleanup(proc);
+          }
+          return;
+        }
 
-    } while (rootProcStack.isFailed());
+      } while (rootProcStack.isFailed());
+    } finally {
+      // Only after procedure has completed execution can it be allowed to be 
rescheduled to prevent
+      // data races
+      if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
+        LOG.info("procedureId {} wait for lock.", proc.getProcId());
+        ((ConfigNodeProcedureEnv) 
this.environment).getNodeLock().waitProcedure(proc);
+      }
+    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
index e2d29e1a1d2..b1410279173 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java
@@ -47,8 +47,9 @@ public abstract class AbstractNodeProcedure<TState>
         LOG.info("procedureId {} acquire lock.", getProcId());
         return ProcedureLockState.LOCK_ACQUIRED;
       }
-      configNodeProcedureEnv.getNodeLock().waitProcedure(this);
-      LOG.info("procedureId {} wait for lock.", getProcId());
+      LOG.info(
+          "procedureId {} acquire lock failed, will wait for lock after 
finishing execution.",
+          getProcId());
       return ProcedureLockState.LOCK_EVENT_WAIT;
     } finally {
       configNodeProcedureEnv.getSchedulerLock().unlock();

Reply via email to