This is an automated email from the ASF dual-hosted git repository.

pengjunzhi pushed a commit to branch procedure-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit df71d1220ee5906cce2f19e41b01a846a5b4c870
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Nov 14 10:30:23 2025 +0800

    refine and fix the whole logic
---
 .../procedure/CompletedProcedureRecycler.java      |  6 ++-
 .../iotdb/confignode/procedure/Procedure.java      |  9 +++-
 .../confignode/procedure/ProcedureExecutor.java    | 47 ++++++++++-------
 .../procedure/TimeoutExecutorThread.java           |  2 +
 .../procedure/store/ConfigProcedureStore.java      |  4 ++
 .../org/apache/iotdb/commons/utils/RetryUtils.java | 59 +++++++++++++++++++++-
 6 files changed, 104 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
index db3a599163a..72e32a5c4c8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
@@ -74,7 +74,9 @@ public class CompletedProcedureRecycler<Env> extends 
InternalProcedure<Env> {
             store.delete(batchIds, 0, batchCount);
           } catch (Exception e) {
             LOG.error("Error deleting completed procedures {}.", proc, e);
-            // Do not remove from the completed map
+            // Do not remove from the completed map. Even this procedure may 
be restored
+            // unexpectedly in another new CN leader, we do not need to do 
anything else since
+            // procedures are idempotency.
             continue;
           } finally {
             batchCount = 0;
@@ -88,6 +90,8 @@ public class CompletedProcedureRecycler<Env> extends 
InternalProcedure<Env> {
       try {
         store.delete(batchIds, 0, batchCount);
       } catch (Exception e) {
+        // Even this procedure may be restored unexpectedly in another new CN 
leader, we do not need
+        // to do anything else since procedures are idempotency.
         LOG.error("Error deleting completed procedures {}.", batchIds, e);
       }
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index b660737bf3f..83c0776397a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.state.ProcedureState;
 import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -304,6 +305,9 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
         locked = true;
         store.update(this);
       } catch (Exception e) {
+        // Do not need to do anything else. New leader which restore this 
procedure from a wrong
+        // state will reexecute it and converge to the correct state since 
procedures are
+        // idempotency.
         LOG.warn("pid={} Failed to persist lock state to store.", this.procId, 
e);
       }
     }
@@ -316,7 +320,7 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
    * @param env environment
    * @param store ProcedureStore
    */
-  public final void doReleaseLock(Env env, IProcedureStore store) {
+  public final void doReleaseLock(Env env, IProcedureStore store) throws 
Exception {
     locked = false;
     if (getState() == ProcedureState.ROLLEDBACK) {
       LOG.info("Force write unlock state to raft for pid={}", this.procId);
@@ -325,8 +329,9 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
       store.update(this);
       // do not release lock when consensus layer is not working
       releaseLock(env);
-    } catch (Exception e) {
+    } catch (ConsensusException e) {
       LOG.error("pid={} Failed to persist unlock state to store.", 
this.procId, e);
+      throw e;
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 9e3cb5c4877..8a1afc77581 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -221,7 +222,8 @@ public class ProcedureExecutor<Env> {
 
   private void releaseLock(Procedure<Env> procedure, boolean force) {
     if (force || !procedure.holdLock(this.environment) || 
procedure.isFinished()) {
-      procedure.doReleaseLock(this.environment, store);
+      RetryUtils.executeWithEndlessBackoffRetry(
+          () -> procedure.doReleaseLock(this.environment, store), "procedure 
release lock");
     }
   }
 
@@ -477,17 +479,16 @@ public class ProcedureExecutor<Env> {
     }
     if (parent != null && parent.tryRunnable()) {
       // If success, means all its children have completed, move parent to 
front of the queue.
-      try {
-        store.update(parent);
-        // do not add this procedure when exception occurred
-        scheduler.addFront(parent);
-        LOG.info(
-            "Finished subprocedure pid={}, resume processing ppid={}",
-            proc.getProcId(),
-            parent.getProcId());
-      } catch (Exception e) {
-        LOG.warn("Failed to update parent on countdown", e);
-      }
+      // Must endless retry here, since this step is not idempotency and can 
not be reexecute
+      // correctly in new CN leader.
+      RetryUtils.executeWithEndlessBackoffRetry(
+          () -> store.update(parent), "count down children procedure");
+      // do not add this procedure when exception occurred
+      scheduler.addFront(parent);
+      LOG.info(
+          "Finished subprocedure pid={}, resume processing ppid={}",
+          proc.getProcId(),
+          parent.getProcId());
     }
   }
 
@@ -514,6 +515,8 @@ public class ProcedureExecutor<Env> {
       try {
         store.update(subprocs);
       } catch (Exception e) {
+        // Do nothing since this step is idempotency. New CN leader can 
converge to the correct
+        // state when restore this procedure.
         LOG.warn("Failed to update subprocs on execution", e);
       }
     } else {
@@ -528,6 +531,8 @@ public class ProcedureExecutor<Env> {
               procedures.remove(childProcId);
             }
           } catch (Exception e) {
+            // Do nothing since this step is idempotency. New CN leader can 
converge to the correct
+            // state when restore this procedure.
             LOG.warn("Failed to delete subprocedures on execution", e);
           }
         } else {
@@ -541,6 +546,8 @@ public class ProcedureExecutor<Env> {
         try {
           store.update(proc);
         } catch (Exception e) {
+          // Do nothing since this step is idempotency. New CN leader can 
converge to the correct
+          // state when restore this procedure.
           LOG.warn("Failed to update procedure on execution", e);
         }
       }
@@ -599,13 +606,9 @@ public class ProcedureExecutor<Env> {
     if (exception == null) {
       exception = procedureStack.getException();
       rootProcedure.setFailure(exception);
-      try {
-        store.update(rootProcedure);
-      } catch (Exception e) {
-        LOG.warn("Failed to update root procedure on rollback", e);
-        // roll back
-        rootProcedure.setFailure(null);
-      }
+      // Endless retry since this step is not idempotency.
+      RetryUtils.executeWithEndlessBackoffRetry(
+          () -> store.update(rootProcedure), "root procedure rollback");
     }
     List<Procedure<Env>> subprocStack = procedureStack.getSubproceduresStack();
     int stackTail = subprocStack.size();
@@ -686,6 +689,8 @@ public class ProcedureExecutor<Env> {
           // do not remove this procedure when exception occurred
           procedures.remove(procedure.getProcId());
         } catch (Exception e) {
+          // Do nothing since this step is idempotency. New CN leader can 
converge to the correct
+          // state when restore this procedure.
           LOG.warn("Failed to delete procedure on rollback", e);
         }
       } else {
@@ -697,6 +702,8 @@ public class ProcedureExecutor<Env> {
             store.update(procedure);
           }
         } catch (Exception e) {
+          // Do nothing since this step is idempotency. New CN leader can 
converge to the correct
+          // state when restore this procedure.
           LOG.warn("Failed to delete procedure on rollback", e);
         }
       }
@@ -704,6 +711,8 @@ public class ProcedureExecutor<Env> {
       try {
         store.update(procedure);
       } catch (Exception e) {
+        // Do nothing since this step is idempotency. New CN leader can 
converge to the correct
+        // state when restore this procedure.
         LOG.warn("Failed to update procedure on rollback", e);
       }
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 0a1f5f8248b..1614148abd3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -78,6 +78,8 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
           try {
             executor.getStore().update(procedure);
           } catch (Exception e) {
+            // Do nothing since new CN leader can converge to the correct 
state when restore this
+            // procedure.
             LOGGER.warn("Failed to update procedure {}", procedure, e);
           }
           executor.getScheduler().addFront(procedure);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
index 1cd6687b26c..603a9a8a627 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
@@ -99,6 +99,8 @@ public class ConfigProcedureStore implements 
IProcedureStore<ConfigNodeProcedure
           "pid={} Failed in the write update API executing the consensus layer 
due to: ",
           procedure.getProcId(),
           e);
+      // In consensus layer API, do nothing but just throw an exception to let 
upper caller handle
+      // it.
       throw e;
     }
   }
@@ -121,6 +123,8 @@ public class ConfigProcedureStore implements 
IProcedureStore<ConfigNodeProcedure
           "pid={} Failed in the write delete API executing the consensus layer 
due to: ",
           procId,
           e);
+      // In consensus layer API, do nothing but just throw an exception to let 
upper caller handle
+      // it.
       throw e;
     }
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index cff60601edb..9b297cae053 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -22,10 +22,15 @@ package org.apache.iotdb.commons.utils;
 import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.ConnectException;
 
 public class RetryUtils {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RetryUtils.class);
+
   public interface CallableWithException<T, E extends Exception> {
     T call() throws E;
   }
@@ -46,7 +51,7 @@ public class RetryUtils {
     return statusCode == 
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST.getStatusCode();
   }
 
-  public static final int MAX_RETRIES = 3;
+  public static final int MAX_RETRIES = 5;
 
   public static <T, E extends Exception> T retryOnException(
       final CallableWithException<T, E> callable) throws E {
@@ -63,6 +68,58 @@ public class RetryUtils {
     }
   }
 
+  private static final long INITIAL_BACKOFF_MS = 100;
+  private static final long MAX_BACKOFF_MS = 60000;
+
+  @FunctionalInterface
+  public interface OperationWithException {
+    void run() throws Exception;
+  }
+
+  /**
+   * Exponential backoff retry helper method.
+   *
+   * @param operation The operation to execute.
+   * @param operationName A description of the operation (for logging).
+   */
+  public static void executeWithEndlessBackoffRetry(
+      OperationWithException operation, String operationName) {
+    long currentBackoff = INITIAL_BACKOFF_MS;
+    int attempt = 0;
+
+    // Endless retry
+    while (true) {
+      attempt++;
+      try {
+        operation.run();
+        if (attempt > 1) {
+          LOGGER.info("Operation '{}' succeeded after {} attempts", 
operationName, attempt);
+        }
+        return;
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Operation '{}' failed (attempt {}). Retrying in {}ms...",
+            operationName,
+            attempt,
+            currentBackoff,
+            e);
+        try {
+          Thread.sleep(currentBackoff);
+        } catch (InterruptedException ie) {
+          LOGGER.warn(
+              "Retry wait for operation '{}' was interrupted, stopping 
retries.",
+              operationName,
+              ie);
+          Thread.currentThread().interrupt();
+          return;
+        }
+
+        // Double the backoff, but cap it at the max to prevent overflow
+        currentBackoff = Math.min(currentBackoff * 2, MAX_BACKOFF_MS);
+      }
+    }
+  }
+
   private RetryUtils() {
     // utility class
   }

Reply via email to