This is an automated email from the ASF dual-hosted git repository. pengjunzhi pushed a commit to branch procedure-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit df71d1220ee5906cce2f19e41b01a846a5b4c870 Author: Peng Junzhi <[email protected]> AuthorDate: Fri Nov 14 10:30:23 2025 +0800 refine and fix the whole logic --- .../procedure/CompletedProcedureRecycler.java | 6 ++- .../iotdb/confignode/procedure/Procedure.java | 9 +++- .../confignode/procedure/ProcedureExecutor.java | 47 ++++++++++------- .../procedure/TimeoutExecutorThread.java | 2 + .../procedure/store/ConfigProcedureStore.java | 4 ++ .../org/apache/iotdb/commons/utils/RetryUtils.java | 59 +++++++++++++++++++++- 6 files changed, 104 insertions(+), 23 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java index db3a599163a..72e32a5c4c8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java @@ -74,7 +74,9 @@ public class CompletedProcedureRecycler<Env> extends InternalProcedure<Env> { store.delete(batchIds, 0, batchCount); } catch (Exception e) { LOG.error("Error deleting completed procedures {}.", proc, e); - // Do not remove from the completed map + // Do not remove from the completed map. Even this procedure may be restored + // unexpectedly in another new CN leader, we do not need to do anything else since + // procedures are idempotency. continue; } finally { batchCount = 0; @@ -88,6 +90,8 @@ public class CompletedProcedureRecycler<Env> extends InternalProcedure<Env> { try { store.delete(batchIds, 0, batchCount); } catch (Exception e) { + // Even this procedure may be restored unexpectedly in another new CN leader, we do not need + // to do anything else since procedures are idempotency. LOG.error("Error deleting completed procedures {}.", batchIds, e); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index b660737bf3f..83c0776397a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -24,6 +24,7 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; import org.apache.iotdb.confignode.procedure.state.ProcedureState; import org.apache.iotdb.confignode.procedure.store.IProcedureStore; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -304,6 +305,9 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> { locked = true; store.update(this); } catch (Exception e) { + // Do not need to do anything else. New leader which restore this procedure from a wrong + // state will reexecute it and converge to the correct state since procedures are + // idempotency. LOG.warn("pid={} Failed to persist lock state to store.", this.procId, e); } } @@ -316,7 +320,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> { * @param env environment * @param store ProcedureStore */ - public final void doReleaseLock(Env env, IProcedureStore store) { + public final void doReleaseLock(Env env, IProcedureStore store) throws Exception { locked = false; if (getState() == ProcedureState.ROLLEDBACK) { LOG.info("Force write unlock state to raft for pid={}", this.procId); @@ -325,8 +329,9 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> { store.update(this); // do not release lock when consensus layer is not working releaseLock(env); - } catch (Exception e) { + } catch (ConsensusException e) { LOG.error("pid={} Failed to persist unlock state to store.", this.procId, e); + throw e; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 9e3cb5c4877..8a1afc77581 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure; import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; @@ -221,7 +222,8 @@ public class ProcedureExecutor<Env> { private void releaseLock(Procedure<Env> procedure, boolean force) { if (force || !procedure.holdLock(this.environment) || procedure.isFinished()) { - procedure.doReleaseLock(this.environment, store); + RetryUtils.executeWithEndlessBackoffRetry( + () -> procedure.doReleaseLock(this.environment, store), "procedure release lock"); } } @@ -477,17 +479,16 @@ public class ProcedureExecutor<Env> { } if (parent != null && parent.tryRunnable()) { // If success, means all its children have completed, move parent to front of the queue. - try { - store.update(parent); - // do not add this procedure when exception occurred - scheduler.addFront(parent); - LOG.info( - "Finished subprocedure pid={}, resume processing ppid={}", - proc.getProcId(), - parent.getProcId()); - } catch (Exception e) { - LOG.warn("Failed to update parent on countdown", e); - } + // Must endless retry here, since this step is not idempotency and can not be reexecute + // correctly in new CN leader. + RetryUtils.executeWithEndlessBackoffRetry( + () -> store.update(parent), "count down children procedure"); + // do not add this procedure when exception occurred + scheduler.addFront(parent); + LOG.info( + "Finished subprocedure pid={}, resume processing ppid={}", + proc.getProcId(), + parent.getProcId()); } } @@ -514,6 +515,8 @@ public class ProcedureExecutor<Env> { try { store.update(subprocs); } catch (Exception e) { + // Do nothing since this step is idempotency. New CN leader can converge to the correct + // state when restore this procedure. LOG.warn("Failed to update subprocs on execution", e); } } else { @@ -528,6 +531,8 @@ public class ProcedureExecutor<Env> { procedures.remove(childProcId); } } catch (Exception e) { + // Do nothing since this step is idempotency. New CN leader can converge to the correct + // state when restore this procedure. LOG.warn("Failed to delete subprocedures on execution", e); } } else { @@ -541,6 +546,8 @@ public class ProcedureExecutor<Env> { try { store.update(proc); } catch (Exception e) { + // Do nothing since this step is idempotency. New CN leader can converge to the correct + // state when restore this procedure. LOG.warn("Failed to update procedure on execution", e); } } @@ -599,13 +606,9 @@ public class ProcedureExecutor<Env> { if (exception == null) { exception = procedureStack.getException(); rootProcedure.setFailure(exception); - try { - store.update(rootProcedure); - } catch (Exception e) { - LOG.warn("Failed to update root procedure on rollback", e); - // roll back - rootProcedure.setFailure(null); - } + // Endless retry since this step is not idempotency. + RetryUtils.executeWithEndlessBackoffRetry( + () -> store.update(rootProcedure), "root procedure rollback"); } List<Procedure<Env>> subprocStack = procedureStack.getSubproceduresStack(); int stackTail = subprocStack.size(); @@ -686,6 +689,8 @@ public class ProcedureExecutor<Env> { // do not remove this procedure when exception occurred procedures.remove(procedure.getProcId()); } catch (Exception e) { + // Do nothing since this step is idempotency. New CN leader can converge to the correct + // state when restore this procedure. LOG.warn("Failed to delete procedure on rollback", e); } } else { @@ -697,6 +702,8 @@ public class ProcedureExecutor<Env> { store.update(procedure); } } catch (Exception e) { + // Do nothing since this step is idempotency. New CN leader can converge to the correct + // state when restore this procedure. LOG.warn("Failed to delete procedure on rollback", e); } } @@ -704,6 +711,8 @@ public class ProcedureExecutor<Env> { try { store.update(procedure); } catch (Exception e) { + // Do nothing since this step is idempotency. New CN leader can converge to the correct + // state when restore this procedure. LOG.warn("Failed to update procedure on rollback", e); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java index 0a1f5f8248b..1614148abd3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java @@ -78,6 +78,8 @@ public class TimeoutExecutorThread<Env> extends StoppableThread { try { executor.getStore().update(procedure); } catch (Exception e) { + // Do nothing since new CN leader can converge to the correct state when restore this + // procedure. LOGGER.warn("Failed to update procedure {}", procedure, e); } executor.getScheduler().addFront(procedure); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java index 1cd6687b26c..603a9a8a627 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java @@ -99,6 +99,8 @@ public class ConfigProcedureStore implements IProcedureStore<ConfigNodeProcedure "pid={} Failed in the write update API executing the consensus layer due to: ", procedure.getProcId(), e); + // In consensus layer API, do nothing but just throw an exception to let upper caller handle + // it. throw e; } } @@ -121,6 +123,8 @@ public class ConfigProcedureStore implements IProcedureStore<ConfigNodeProcedure "pid={} Failed in the write delete API executing the consensus layer due to: ", procId, e); + // In consensus layer API, do nothing but just throw an exception to let upper caller handle + // it. throw e; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java index cff60601edb..9b297cae053 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java @@ -22,10 +22,15 @@ package org.apache.iotdb.commons.utils; import org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException; import org.apache.iotdb.rpc.TSStatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.ConnectException; public class RetryUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(RetryUtils.class); + public interface CallableWithException<T, E extends Exception> { T call() throws E; } @@ -46,7 +51,7 @@ public class RetryUtils { return statusCode == TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST.getStatusCode(); } - public static final int MAX_RETRIES = 3; + public static final int MAX_RETRIES = 5; public static <T, E extends Exception> T retryOnException( final CallableWithException<T, E> callable) throws E { @@ -63,6 +68,58 @@ public class RetryUtils { } } + private static final long INITIAL_BACKOFF_MS = 100; + private static final long MAX_BACKOFF_MS = 60000; + + @FunctionalInterface + public interface OperationWithException { + void run() throws Exception; + } + + /** + * Exponential backoff retry helper method. + * + * @param operation The operation to execute. + * @param operationName A description of the operation (for logging). + */ + public static void executeWithEndlessBackoffRetry( + OperationWithException operation, String operationName) { + long currentBackoff = INITIAL_BACKOFF_MS; + int attempt = 0; + + // Endless retry + while (true) { + attempt++; + try { + operation.run(); + if (attempt > 1) { + LOGGER.info("Operation '{}' succeeded after {} attempts", operationName, attempt); + } + return; + } catch (Exception e) { + LOGGER.warn( + "Operation '{}' failed (attempt {}). Retrying in {}ms...", + operationName, + attempt, + currentBackoff, + e); + try { + Thread.sleep(currentBackoff); + } catch (InterruptedException ie) { + LOGGER.warn( + "Retry wait for operation '{}' was interrupted, stopping retries.", + operationName, + ie); + Thread.currentThread().interrupt(); + return; + } + + // Double the backoff, but cap it at the max to prevent overflow + currentBackoff = Math.min(currentBackoff * 2, MAX_BACKOFF_MS); + } + } + } + private RetryUtils() { // utility class }
