This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e209e73c21 Refine CN consensus layer API for procedure robustness 
(#16303)
0e209e73c21 is described below

commit 0e209e73c21137be44cebe3926c7f1fed62a0422
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Nov 20 05:37:40 2025 -0600

    Refine CN consensus layer API for procedure robustness (#16303)
    
    * refine cn consensus layer API for procedure
    
    * refine and fix the whole logic
    
    * fix comment
    
    * fix comment
    
    ---------
    
    Co-authored-by: 彭俊植 <[email protected]>
---
 .../procedure/CompletedProcedureRecycler.java      | 21 +++++-
 .../iotdb/confignode/procedure/Procedure.java      | 27 +++++--
 .../confignode/procedure/ProcedureExecutor.java    | 88 +++++++++++++++++-----
 .../procedure/TimeoutExecutorThread.java           | 12 ++-
 .../procedure/store/ConfigProcedureStore.java      | 26 +++++--
 .../procedure/store/IProcedureStore.java           | 10 +--
 .../org/apache/iotdb/commons/utils/RetryUtils.java | 59 ++++++++++++++-
 7 files changed, 203 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
index cea15a1bfb6..179563cc3b3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
@@ -70,15 +70,30 @@ public class CompletedProcedureRecycler<Env> extends 
InternalProcedure<Env> {
         // Failed procedures aren't persisted in WAL.
         batchIds[batchCount++] = entry.getKey();
         if (batchCount == batchIds.length) {
-          store.delete(batchIds, 0, batchCount);
-          batchCount = 0;
+          try {
+            store.delete(batchIds, 0, batchCount);
+          } catch (Exception e) {
+            LOG.error("Error deleting completed procedures {}.", proc, e);
+            // Do not remove from the completed map. Even this procedure may 
be restored
+            // unexpectedly in another new CN leader, we do not need to do 
anything else since
+            // procedures are idempotent.
+            continue;
+          } finally {
+            batchCount = 0;
+          }
         }
         it.remove();
         LOG.trace("Evict completed {}", proc);
       }
     }
     if (batchCount > 0) {
-      store.delete(batchIds, 0, batchCount);
+      try {
+        store.delete(batchIds, 0, batchCount);
+      } catch (Exception e) {
+        // Even this procedure may be restored unexpectedly in another new CN 
leader, we do not need
+        // to do anything else since procedures are idempotent.
+        LOG.error("Error deleting completed procedures {}.", batchIds, e);
+      }
     }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 91af03d3971..89e6e37e431 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.state.ProcedureState;
 import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -196,7 +197,7 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
       byteBuffer.get(resultArr);
     }
     //  has lock
-    if (byteBuffer.get() == 1) {
+    if (byteBuffer.get() == 1 && this.state != ProcedureState.ROLLEDBACK) {
       this.lockedWhenLoading();
     }
   }
@@ -300,8 +301,15 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
     }
     ProcedureLockState state = acquireLock(env);
     if (state == ProcedureLockState.LOCK_ACQUIRED) {
-      locked = true;
-      store.update(this);
+      try {
+        locked = true;
+        store.update(this);
+      } catch (Exception e) {
+        // Do not need to do anything else. New leader which restore this 
procedure from a wrong
+        // state will reexecute it and converge to the correct state since 
procedures are
+        // idempotent.
+        LOG.warn("pid={} Failed to persist lock state to store.", this.procId, 
e);
+      }
     }
     return state;
   }
@@ -312,12 +320,19 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
    * @param env environment
    * @param store ProcedureStore
    */
-  public final void doReleaseLock(Env env, IProcedureStore store) {
+  public final void doReleaseLock(Env env, IProcedureStore store) throws 
Exception {
     locked = false;
-    if (getState() != ProcedureState.ROLLEDBACK) {
+    if (getState() == ProcedureState.ROLLEDBACK) {
+      LOG.info("Force write unlock state to raft for pid={}", this.procId);
+    }
+    try {
       store.update(this);
+      // do not release lock when consensus layer is not working
+      releaseLock(env);
+    } catch (ConsensusException e) {
+      LOG.error("pid={} Failed to persist unlock state to store.", 
this.procId, e);
+      throw e;
     }
-    releaseLock(env);
   }
 
   public final void restoreLock(Env env) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 0d8368583b4..efd37778efa 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -221,7 +222,8 @@ public class ProcedureExecutor<Env> {
 
   private void releaseLock(Procedure<Env> procedure, boolean force) {
     if (force || !procedure.holdLock(this.environment) || 
procedure.isFinished()) {
-      procedure.doReleaseLock(this.environment, store);
+      RetryUtils.executeWithEndlessBackoffRetry(
+          () -> procedure.doReleaseLock(this.environment, store), "procedure 
release lock");
     }
   }
 
@@ -477,7 +479,11 @@ public class ProcedureExecutor<Env> {
     }
     if (parent != null && parent.tryRunnable()) {
       // If success, means all its children have completed, move parent to 
front of the queue.
-      store.update(parent);
+      // Must endless retry here, since this step is not idempotent and can 
not be re-execute
+      // correctly in new CN leader.
+      RetryUtils.executeWithEndlessBackoffRetry(
+          () -> store.update(parent), "count down children procedure");
+      // do not add this procedure when exception occurred
       scheduler.addFront(parent);
       LOG.info(
           "Finished subprocedure pid={}, resume processing ppid={}",
@@ -506,21 +512,44 @@ public class ProcedureExecutor<Env> {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Stored {}, children {}", proc, Arrays.toString(subprocs));
       }
-      store.update(subprocs);
+      try {
+        store.update(subprocs);
+      } catch (Exception e) {
+        // Do nothing since this step is idempotent. New CN leader can 
converge to the correct
+        // state when restore this procedure.
+        LOG.warn("Failed to update subprocs on execution", e);
+      }
     } else {
       LOG.debug("Store update {}", proc);
       if (proc.isFinished() && !proc.hasParent()) {
         final long[] childProcIds = rootProcStack.getSubprocedureIds();
         if (childProcIds != null) {
-          store.delete(childProcIds);
-          for (long childProcId : childProcIds) {
-            procedures.remove(childProcId);
+          try {
+            store.delete(childProcIds);
+            // do not remove these procedures when exception occurred
+            for (long childProcId : childProcIds) {
+              procedures.remove(childProcId);
+            }
+          } catch (Exception e) {
+            // Do nothing since this step is idempotent. New CN leader can 
converge to the correct
+            // state when restore this procedure.
+            LOG.warn("Failed to delete subprocedures on execution", e);
           }
         } else {
-          store.update(proc);
+          try {
+            store.update(proc);
+          } catch (Exception e) {
+            LOG.warn("Failed to update procedure on execution", e);
+          }
         }
       } else {
-        store.update(proc);
+        try {
+          store.update(proc);
+        } catch (Exception e) {
+          // Do nothing since this step is idempotent. New CN leader can 
converge to the correct
+          // state when restore this procedure.
+          LOG.warn("Failed to update procedure on execution", e);
+        }
       }
     }
   }
@@ -577,7 +606,9 @@ public class ProcedureExecutor<Env> {
     if (exception == null) {
       exception = procedureStack.getException();
       rootProcedure.setFailure(exception);
-      store.update(rootProcedure);
+      // Endless retry since this step is not idempotent.
+      RetryUtils.executeWithEndlessBackoffRetry(
+          () -> store.update(rootProcedure), "root procedure rollback");
     }
     List<Procedure<Env>> subprocStack = procedureStack.getSubproceduresStack();
     int stackTail = subprocStack.size();
@@ -653,18 +684,37 @@ public class ProcedureExecutor<Env> {
       procedure.updateMetricsOnFinish(getEnvironment(), 
procedure.elapsedTime(), false);
 
       if (procedure.hasParent()) {
-        store.delete(procedure.getProcId());
-        procedures.remove(procedure.getProcId());
+        try {
+          store.delete(procedure.getProcId());
+          // do not remove this procedure when exception occurred
+          procedures.remove(procedure.getProcId());
+        } catch (Exception e) {
+          // Do nothing since this step is idempotent. New CN leader can 
converge to the correct
+          // state when restore this procedure.
+          LOG.warn("Failed to delete procedure on rollback", e);
+        }
       } else {
         final long[] childProcIds = 
rollbackStack.get(procedure.getProcId()).getSubprocedureIds();
-        if (childProcIds != null) {
-          store.delete(childProcIds);
-        } else {
-          store.update(procedure);
+        try {
+          if (childProcIds != null) {
+            store.delete(childProcIds);
+          } else {
+            store.update(procedure);
+          }
+        } catch (Exception e) {
+          // Do nothing since this step is idempotent. New CN leader can 
converge to the correct
+          // state when restore this procedure.
+          LOG.warn("Failed to delete procedure on rollback", e);
         }
       }
     } else {
-      store.update(procedure);
+      try {
+        store.update(procedure);
+      } catch (Exception e) {
+        // Do nothing since this step is idempotent. New CN leader can 
converge to the correct
+        // state when restore this procedure.
+        LOG.warn("Failed to update procedure on rollback", e);
+      }
     }
   }
 
@@ -916,7 +966,11 @@ public class ProcedureExecutor<Env> {
     procedure.setProcId(store.getNextProcId());
     procedure.setProcRunnable();
     // Commit the transaction
-    store.update(procedure);
+    try {
+      store.update(procedure);
+    } catch (Exception e) {
+      LOG.error("Failed to update store procedure {}", procedure, e);
+    }
     LOG.debug("{} is stored.", procedure);
     // Add the procedure to the executor
     return pushProcedure(procedure);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 5aaf9a623f5..1614148abd3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -19,12 +19,16 @@
 
 package org.apache.iotdb.confignode.procedure;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
 public class TimeoutExecutorThread<Env> extends StoppableThread {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeoutExecutorThread.class);
   private static final int DELAY_QUEUE_TIMEOUT = 20;
   private final ProcedureExecutor<Env> executor;
   private final DelayQueue<ProcedureDelayContainer<Env>> queue = new 
DelayQueue<>();
@@ -71,7 +75,13 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
           long rootProcId = executor.getRootProcedureId(procedure);
           RootProcedureStack<Env> rollbackStack = 
executor.getRollbackStack(rootProcId);
           rollbackStack.abort();
-          executor.getStore().update(procedure);
+          try {
+            executor.getStore().update(procedure);
+          } catch (Exception e) {
+            // Do nothing since new CN leader can converge to the correct 
state when restore this
+            // procedure.
+            LOGGER.warn("Failed to update procedure {}", procedure, e);
+          }
           executor.getScheduler().addFront(procedure);
         }
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
index 393c1e93740..603a9a8a627 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
@@ -89,43 +89,55 @@ public class ConfigProcedureStore implements 
IProcedureStore<ConfigNodeProcedure
   }
 
   @Override
-  public void update(Procedure<ConfigNodeProcedureEnv> procedure) {
+  public void update(Procedure<ConfigNodeProcedureEnv> procedure) throws 
Exception {
     Objects.requireNonNull(ProcedureFactory.getProcedureType(procedure), 
"Procedure type is null");
     final UpdateProcedurePlan updateProcedurePlan = new 
UpdateProcedurePlan(procedure);
     try {
       configManager.getConsensusManager().write(updateProcedurePlan);
     } catch (ConsensusException e) {
-      LOG.warn("Failed in the write API executing the consensus layer due to: 
", e);
+      LOG.warn(
+          "pid={} Failed in the write update API executing the consensus layer 
due to: ",
+          procedure.getProcId(),
+          e);
+      // In consensus layer API, do nothing but just throw an exception to let 
upper caller handle
+      // it.
+      throw e;
     }
   }
 
   @Override
-  public void update(Procedure[] subprocs) {
+  public void update(Procedure[] subprocs) throws Exception {
     for (Procedure subproc : subprocs) {
       update(subproc);
     }
   }
 
   @Override
-  public void delete(long procId) {
+  public void delete(long procId) throws Exception {
     DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan();
     deleteProcedurePlan.setProcId(procId);
     try {
       configManager.getConsensusManager().write(deleteProcedurePlan);
     } catch (ConsensusException e) {
-      LOG.warn("Failed in the write API executing the consensus layer due to: 
", e);
+      LOG.warn(
+          "pid={} Failed in the write delete API executing the consensus layer 
due to: ",
+          procId,
+          e);
+      // In consensus layer API, do nothing but just throw an exception to let 
upper caller handle
+      // it.
+      throw e;
     }
   }
 
   @Override
-  public void delete(long[] childProcIds) {
+  public void delete(long[] childProcIds) throws Exception {
     for (long childProcId : childProcIds) {
       delete(childProcId);
     }
   }
 
   @Override
-  public void delete(long[] batchIds, int startIndex, int batchCount) {
+  public void delete(long[] batchIds, int startIndex, int batchCount) throws 
Exception {
     for (int i = startIndex; i < batchCount; i++) {
       delete(batchIds[i]);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
index 8e8e715fd84..3dba6d29288 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
@@ -38,15 +38,15 @@ public interface IProcedureStore<Env> {
 
   long getNextProcId();
 
-  void update(Procedure<Env> procedure);
+  void update(Procedure<Env> procedure) throws Exception;
 
-  void update(Procedure<Env>[] subprocs);
+  void update(Procedure<Env>[] subprocs) throws Exception;
 
-  void delete(long procId);
+  void delete(long procId) throws Exception;
 
-  void delete(long[] childProcIds);
+  void delete(long[] childProcIds) throws Exception;
 
-  void delete(long[] batchIds, int startIndex, int batchCount);
+  void delete(long[] batchIds, int startIndex, int batchCount) throws 
Exception;
 
   void cleanup();
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index cff60601edb..9b297cae053 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -22,10 +22,15 @@ package org.apache.iotdb.commons.utils;
 import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.ConnectException;
 
 public class RetryUtils {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RetryUtils.class);
+
   public interface CallableWithException<T, E extends Exception> {
     T call() throws E;
   }
@@ -46,7 +51,7 @@ public class RetryUtils {
     return statusCode == 
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST.getStatusCode();
   }
 
-  public static final int MAX_RETRIES = 3;
+  public static final int MAX_RETRIES = 5;
 
   public static <T, E extends Exception> T retryOnException(
       final CallableWithException<T, E> callable) throws E {
@@ -63,6 +68,58 @@ public class RetryUtils {
     }
   }
 
+  private static final long INITIAL_BACKOFF_MS = 100;
+  private static final long MAX_BACKOFF_MS = 60000;
+
+  @FunctionalInterface
+  public interface OperationWithException {
+    void run() throws Exception;
+  }
+
+  /**
+   * Exponential backoff retry helper method.
+   *
+   * @param operation The operation to execute.
+   * @param operationName A description of the operation (for logging).
+   */
+  public static void executeWithEndlessBackoffRetry(
+      OperationWithException operation, String operationName) {
+    long currentBackoff = INITIAL_BACKOFF_MS;
+    int attempt = 0;
+
+    // Endless retry
+    while (true) {
+      attempt++;
+      try {
+        operation.run();
+        if (attempt > 1) {
+          LOGGER.info("Operation '{}' succeeded after {} attempts", 
operationName, attempt);
+        }
+        return;
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Operation '{}' failed (attempt {}). Retrying in {}ms...",
+            operationName,
+            attempt,
+            currentBackoff,
+            e);
+        try {
+          Thread.sleep(currentBackoff);
+        } catch (InterruptedException ie) {
+          LOGGER.warn(
+              "Retry wait for operation '{}' was interrupted, stopping 
retries.",
+              operationName,
+              ie);
+          Thread.currentThread().interrupt();
+          return;
+        }
+
+        // Double the backoff, but cap it at the max to prevent overflow
+        currentBackoff = Math.min(currentBackoff * 2, MAX_BACKOFF_MS);
+      }
+    }
+  }
+
   private RetryUtils() {
     // utility class
   }

Reply via email to