This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0e209e73c21 Refine CN consensus layer API for procedure robustness
(#16303)
0e209e73c21 is described below
commit 0e209e73c21137be44cebe3926c7f1fed62a0422
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Nov 20 05:37:40 2025 -0600
Refine CN consensus layer API for procedure robustness (#16303)
* refine cn consensus layer API for procedure
* refine and fix the whole logic
* fix comment
* fix comment
---------
Co-authored-by: 彭俊植 <[email protected]>
---
.../procedure/CompletedProcedureRecycler.java | 21 +++++-
.../iotdb/confignode/procedure/Procedure.java | 27 +++++--
.../confignode/procedure/ProcedureExecutor.java | 88 +++++++++++++++++-----
.../procedure/TimeoutExecutorThread.java | 12 ++-
.../procedure/store/ConfigProcedureStore.java | 26 +++++--
.../procedure/store/IProcedureStore.java | 10 +--
.../org/apache/iotdb/commons/utils/RetryUtils.java | 59 ++++++++++++++-
7 files changed, 203 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
index cea15a1bfb6..179563cc3b3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
@@ -70,15 +70,30 @@ public class CompletedProcedureRecycler<Env> extends
InternalProcedure<Env> {
// Failed procedures aren't persisted in WAL.
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
- store.delete(batchIds, 0, batchCount);
- batchCount = 0;
+ try {
+ store.delete(batchIds, 0, batchCount);
+ } catch (Exception e) {
+ LOG.error("Error deleting completed procedures {}.", proc, e);
+ // Do not remove from the completed map. Even this procedure may
be restored
+ // unexpectedly in another new CN leader, we do not need to do
anything else since
+ // procedures are idempotent.
+ continue;
+ } finally {
+ batchCount = 0;
+ }
}
it.remove();
LOG.trace("Evict completed {}", proc);
}
}
if (batchCount > 0) {
- store.delete(batchIds, 0, batchCount);
+ try {
+ store.delete(batchIds, 0, batchCount);
+ } catch (Exception e) {
+ // Even this procedure may be restored unexpectedly in another new CN
leader, we do not need
+ // to do anything else since procedures are idempotent.
+ LOG.error("Error deleting completed procedures {}.", batchIds, e);
+ }
}
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 91af03d3971..89e6e37e431 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
+import org.apache.iotdb.consensus.exception.ConsensusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -196,7 +197,7 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
byteBuffer.get(resultArr);
}
// has lock
- if (byteBuffer.get() == 1) {
+ if (byteBuffer.get() == 1 && this.state != ProcedureState.ROLLEDBACK) {
this.lockedWhenLoading();
}
}
@@ -300,8 +301,15 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
}
ProcedureLockState state = acquireLock(env);
if (state == ProcedureLockState.LOCK_ACQUIRED) {
- locked = true;
- store.update(this);
+ try {
+ locked = true;
+ store.update(this);
+ } catch (Exception e) {
+ // Do not need to do anything else. New leader which restore this
procedure from a wrong
+ // state will reexecute it and converge to the correct state since
procedures are
+ // idempotent.
+ LOG.warn("pid={} Failed to persist lock state to store.", this.procId,
e);
+ }
}
return state;
}
@@ -312,12 +320,19 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
* @param env environment
* @param store ProcedureStore
*/
- public final void doReleaseLock(Env env, IProcedureStore store) {
+ public final void doReleaseLock(Env env, IProcedureStore store) throws
Exception {
locked = false;
- if (getState() != ProcedureState.ROLLEDBACK) {
+ if (getState() == ProcedureState.ROLLEDBACK) {
+ LOG.info("Force write unlock state to raft for pid={}", this.procId);
+ }
+ try {
store.update(this);
+ // do not release lock when consensus layer is not working
+ releaseLock(env);
+ } catch (ConsensusException e) {
+ LOG.error("pid={} Failed to persist unlock state to store.",
this.procId, e);
+ throw e;
}
- releaseLock(env);
}
public final void restoreLock(Env env) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 0d8368583b4..efd37778efa 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -221,7 +222,8 @@ public class ProcedureExecutor<Env> {
private void releaseLock(Procedure<Env> procedure, boolean force) {
if (force || !procedure.holdLock(this.environment) ||
procedure.isFinished()) {
- procedure.doReleaseLock(this.environment, store);
+ RetryUtils.executeWithEndlessBackoffRetry(
+ () -> procedure.doReleaseLock(this.environment, store), "procedure
release lock");
}
}
@@ -477,7 +479,11 @@ public class ProcedureExecutor<Env> {
}
if (parent != null && parent.tryRunnable()) {
// If success, means all its children have completed, move parent to
front of the queue.
- store.update(parent);
+ // Must endless retry here, since this step is not idempotent and can
not be re-execute
+ // correctly in new CN leader.
+ RetryUtils.executeWithEndlessBackoffRetry(
+ () -> store.update(parent), "count down children procedure");
+ // do not add this procedure when exception occurred
scheduler.addFront(parent);
LOG.info(
"Finished subprocedure pid={}, resume processing ppid={}",
@@ -506,21 +512,44 @@ public class ProcedureExecutor<Env> {
if (LOG.isDebugEnabled()) {
LOG.debug("Stored {}, children {}", proc, Arrays.toString(subprocs));
}
- store.update(subprocs);
+ try {
+ store.update(subprocs);
+ } catch (Exception e) {
+ // Do nothing since this step is idempotent. New CN leader can
converge to the correct
+ // state when restore this procedure.
+ LOG.warn("Failed to update subprocs on execution", e);
+ }
} else {
LOG.debug("Store update {}", proc);
if (proc.isFinished() && !proc.hasParent()) {
final long[] childProcIds = rootProcStack.getSubprocedureIds();
if (childProcIds != null) {
- store.delete(childProcIds);
- for (long childProcId : childProcIds) {
- procedures.remove(childProcId);
+ try {
+ store.delete(childProcIds);
+ // do not remove these procedures when exception occurred
+ for (long childProcId : childProcIds) {
+ procedures.remove(childProcId);
+ }
+ } catch (Exception e) {
+ // Do nothing since this step is idempotent. New CN leader can
converge to the correct
+ // state when restore this procedure.
+ LOG.warn("Failed to delete subprocedures on execution", e);
}
} else {
- store.update(proc);
+ try {
+ store.update(proc);
+ } catch (Exception e) {
+ LOG.warn("Failed to update procedure on execution", e);
+ }
}
} else {
- store.update(proc);
+ try {
+ store.update(proc);
+ } catch (Exception e) {
+ // Do nothing since this step is idempotent. New CN leader can
converge to the correct
+ // state when restore this procedure.
+ LOG.warn("Failed to update procedure on execution", e);
+ }
}
}
}
@@ -577,7 +606,9 @@ public class ProcedureExecutor<Env> {
if (exception == null) {
exception = procedureStack.getException();
rootProcedure.setFailure(exception);
- store.update(rootProcedure);
+ // Endless retry since this step is not idempotent.
+ RetryUtils.executeWithEndlessBackoffRetry(
+ () -> store.update(rootProcedure), "root procedure rollback");
}
List<Procedure<Env>> subprocStack = procedureStack.getSubproceduresStack();
int stackTail = subprocStack.size();
@@ -653,18 +684,37 @@ public class ProcedureExecutor<Env> {
procedure.updateMetricsOnFinish(getEnvironment(),
procedure.elapsedTime(), false);
if (procedure.hasParent()) {
- store.delete(procedure.getProcId());
- procedures.remove(procedure.getProcId());
+ try {
+ store.delete(procedure.getProcId());
+ // do not remove this procedure when exception occurred
+ procedures.remove(procedure.getProcId());
+ } catch (Exception e) {
+ // Do nothing since this step is idempotent. New CN leader can
converge to the correct
+ // state when restore this procedure.
+ LOG.warn("Failed to delete procedure on rollback", e);
+ }
} else {
final long[] childProcIds =
rollbackStack.get(procedure.getProcId()).getSubprocedureIds();
- if (childProcIds != null) {
- store.delete(childProcIds);
- } else {
- store.update(procedure);
+ try {
+ if (childProcIds != null) {
+ store.delete(childProcIds);
+ } else {
+ store.update(procedure);
+ }
+ } catch (Exception e) {
+ // Do nothing since this step is idempotent. New CN leader can
converge to the correct
+ // state when restore this procedure.
+ LOG.warn("Failed to delete procedure on rollback", e);
}
}
} else {
- store.update(procedure);
+ try {
+ store.update(procedure);
+ } catch (Exception e) {
+ // Do nothing since this step is idempotent. New CN leader can
converge to the correct
+ // state when restore this procedure.
+ LOG.warn("Failed to update procedure on rollback", e);
+ }
}
}
@@ -916,7 +966,11 @@ public class ProcedureExecutor<Env> {
procedure.setProcId(store.getNextProcId());
procedure.setProcRunnable();
// Commit the transaction
- store.update(procedure);
+ try {
+ store.update(procedure);
+ } catch (Exception e) {
+ LOG.error("Failed to update store procedure {}", procedure, e);
+ }
LOG.debug("{} is stored.", procedure);
// Add the procedure to the executor
return pushProcedure(procedure);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 5aaf9a623f5..1614148abd3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -19,12 +19,16 @@
package org.apache.iotdb.confignode.procedure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class TimeoutExecutorThread<Env> extends StoppableThread {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TimeoutExecutorThread.class);
private static final int DELAY_QUEUE_TIMEOUT = 20;
private final ProcedureExecutor<Env> executor;
private final DelayQueue<ProcedureDelayContainer<Env>> queue = new
DelayQueue<>();
@@ -71,7 +75,13 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
long rootProcId = executor.getRootProcedureId(procedure);
RootProcedureStack<Env> rollbackStack =
executor.getRollbackStack(rootProcId);
rollbackStack.abort();
- executor.getStore().update(procedure);
+ try {
+ executor.getStore().update(procedure);
+ } catch (Exception e) {
+ // Do nothing since new CN leader can converge to the correct
state when restore this
+ // procedure.
+ LOGGER.warn("Failed to update procedure {}", procedure, e);
+ }
executor.getScheduler().addFront(procedure);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
index 393c1e93740..603a9a8a627 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
@@ -89,43 +89,55 @@ public class ConfigProcedureStore implements
IProcedureStore<ConfigNodeProcedure
}
@Override
- public void update(Procedure<ConfigNodeProcedureEnv> procedure) {
+ public void update(Procedure<ConfigNodeProcedureEnv> procedure) throws
Exception {
Objects.requireNonNull(ProcedureFactory.getProcedureType(procedure),
"Procedure type is null");
final UpdateProcedurePlan updateProcedurePlan = new
UpdateProcedurePlan(procedure);
try {
configManager.getConsensusManager().write(updateProcedurePlan);
} catch (ConsensusException e) {
- LOG.warn("Failed in the write API executing the consensus layer due to:
", e);
+ LOG.warn(
+ "pid={} Failed in the write update API executing the consensus layer
due to: ",
+ procedure.getProcId(),
+ e);
+ // In consensus layer API, do nothing but just throw an exception to let
upper caller handle
+ // it.
+ throw e;
}
}
@Override
- public void update(Procedure[] subprocs) {
+ public void update(Procedure[] subprocs) throws Exception {
for (Procedure subproc : subprocs) {
update(subproc);
}
}
@Override
- public void delete(long procId) {
+ public void delete(long procId) throws Exception {
DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan();
deleteProcedurePlan.setProcId(procId);
try {
configManager.getConsensusManager().write(deleteProcedurePlan);
} catch (ConsensusException e) {
- LOG.warn("Failed in the write API executing the consensus layer due to:
", e);
+ LOG.warn(
+ "pid={} Failed in the write delete API executing the consensus layer
due to: ",
+ procId,
+ e);
+ // In consensus layer API, do nothing but just throw an exception to let
upper caller handle
+ // it.
+ throw e;
}
}
@Override
- public void delete(long[] childProcIds) {
+ public void delete(long[] childProcIds) throws Exception {
for (long childProcId : childProcIds) {
delete(childProcId);
}
}
@Override
- public void delete(long[] batchIds, int startIndex, int batchCount) {
+ public void delete(long[] batchIds, int startIndex, int batchCount) throws
Exception {
for (int i = startIndex; i < batchCount; i++) {
delete(batchIds[i]);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
index 8e8e715fd84..3dba6d29288 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
@@ -38,15 +38,15 @@ public interface IProcedureStore<Env> {
long getNextProcId();
- void update(Procedure<Env> procedure);
+ void update(Procedure<Env> procedure) throws Exception;
- void update(Procedure<Env>[] subprocs);
+ void update(Procedure<Env>[] subprocs) throws Exception;
- void delete(long procId);
+ void delete(long procId) throws Exception;
- void delete(long[] childProcIds);
+ void delete(long[] childProcIds) throws Exception;
- void delete(long[] batchIds, int startIndex, int batchCount);
+ void delete(long[] batchIds, int startIndex, int batchCount) throws
Exception;
void cleanup();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index cff60601edb..9b297cae053 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -22,10 +22,15 @@ package org.apache.iotdb.commons.utils;
import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.ConnectException;
public class RetryUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RetryUtils.class);
+
public interface CallableWithException<T, E extends Exception> {
T call() throws E;
}
@@ -46,7 +51,7 @@ public class RetryUtils {
return statusCode ==
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST.getStatusCode();
}
- public static final int MAX_RETRIES = 3;
+ public static final int MAX_RETRIES = 5;
public static <T, E extends Exception> T retryOnException(
final CallableWithException<T, E> callable) throws E {
@@ -63,6 +68,58 @@ public class RetryUtils {
}
}
+ private static final long INITIAL_BACKOFF_MS = 100;
+ private static final long MAX_BACKOFF_MS = 60000;
+
+ @FunctionalInterface
+ public interface OperationWithException {
+ void run() throws Exception;
+ }
+
+ /**
+ * Exponential backoff retry helper method.
+ *
+ * @param operation The operation to execute.
+ * @param operationName A description of the operation (for logging).
+ */
+ public static void executeWithEndlessBackoffRetry(
+ OperationWithException operation, String operationName) {
+ long currentBackoff = INITIAL_BACKOFF_MS;
+ int attempt = 0;
+
+ // Endless retry
+ while (true) {
+ attempt++;
+ try {
+ operation.run();
+ if (attempt > 1) {
+ LOGGER.info("Operation '{}' succeeded after {} attempts",
operationName, attempt);
+ }
+ return;
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Operation '{}' failed (attempt {}). Retrying in {}ms...",
+ operationName,
+ attempt,
+ currentBackoff,
+ e);
+ try {
+ Thread.sleep(currentBackoff);
+ } catch (InterruptedException ie) {
+ LOGGER.warn(
+ "Retry wait for operation '{}' was interrupted, stopping
retries.",
+ operationName,
+ ie);
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ // Double the backoff, but cap it at the max to prevent overflow
+ currentBackoff = Math.min(currentBackoff * 2, MAX_BACKOFF_MS);
+ }
+ }
+ }
+
private RetryUtils() {
// utility class
}