This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 86987e48667 Catch per-startup failures during ConfigNode leader
warm-up (#17898)
86987e48667 is described below
commit 86987e48667212884d3b3db70369fc5097949d7a
Author: Yongzao <[email protected]>
AuthorDate: Wed Jun 10 17:16:56 2026 +0800
Catch per-startup failures during ConfigNode leader warm-up (#17898)
---
.../statemachine/ConfigRegionStateMachine.java | 112 ++++++++++++++++-----
1 file changed, 86 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 251db87dbdc..b3029e66028 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -336,7 +336,9 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
configManager.getLoadManager().startTopologyService();
}
- // Start the remaining leader services in parallel and wait for all of
them to finish.
+ // Start the remaining leader services in parallel and wait for all of
them to finish. Each
+ // startup swallows and logs its own failure (see
startInParallelIfEpochCurrent), so a single
+ // misbehaving service cannot abort the whole transition and leave the
node stuck warming up.
final CompletableFuture<?>[] startups =
leaderServiceStartups().stream()
.map(startup -> startInParallelIfEpochCurrent(epoch, startup))
@@ -364,31 +366,47 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
/** The leader services that can be started independently, in parallel,
within one epoch. */
- private List<Runnable> leaderServiceStartups() {
+ private List<LeaderServiceStartup> leaderServiceStartups() {
return Arrays.asList(
- () ->
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(),
- () ->
configManager.getRetryFailedTasksThread().startRetryFailedTasksService(),
- () -> configManager.getPartitionManager().startRegionCleaner(),
+ new LeaderServiceStartup(
+ "ProcedureInfo.upgrade",
+ () ->
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()),
+ new LeaderServiceStartup(
+ "RetryFailedTasksService",
+ () ->
configManager.getRetryFailedTasksThread().startRetryFailedTasksService()),
+ new LeaderServiceStartup(
+ "RegionCleaner", () ->
configManager.getPartitionManager().startRegionCleaner()),
// Add metrics after leader ready.
- () -> configManager.addMetrics(),
+ new LeaderServiceStartup("Metrics", () -> configManager.addMetrics()),
// Activate leader related service for config pipe.
- () -> PipeConfigNodeAgent.runtime().notifyLeaderReady(),
+ new LeaderServiceStartup(
+ "PipeConfigNodeRuntime", () ->
PipeConfigNodeAgent.runtime().notifyLeaderReady()),
// CQ recovery may be time-consuming, so it is just one more parallel
startup.
- () -> configManager.getCQManager().startCQScheduler(),
- () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(),
- () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(),
- () ->
- configManager
- .getPipeManager()
- .getPipeRuntimeCoordinator()
- .onConfigRegionGroupLeaderChanged(),
- () ->
- configManager
- .getSubscriptionManager()
- .getSubscriptionCoordinator()
- .startSubscriptionMetaSync(),
+ new LeaderServiceStartup(
+ "CQScheduler", () ->
configManager.getCQManager().startCQScheduler()),
+ new LeaderServiceStartup(
+ "PipeMetaSync",
+ () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()),
+ new LeaderServiceStartup(
+ "PipeHeartbeat",
+ () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()),
+ new LeaderServiceStartup(
+ "PipeOnLeaderChanged",
+ () ->
+ configManager
+ .getPipeManager()
+ .getPipeRuntimeCoordinator()
+ .onConfigRegionGroupLeaderChanged()),
+ new LeaderServiceStartup(
+ "SubscriptionMetaSync",
+ () ->
+ configManager
+ .getSubscriptionManager()
+ .getSubscriptionCoordinator()
+ .startSubscriptionMetaSync()),
// To adapt old version, we check cluster ID after state machine has
been fully recovered.
- () -> configManager.getClusterManager().checkClusterId());
+ new LeaderServiceStartup(
+ "CheckClusterId", () ->
configManager.getClusterManager().checkClusterId()));
}
/** Tear down every leader service. Runs on the single transition thread. */
@@ -422,16 +440,35 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
/**
- * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it
(and recording success)
- * if the epoch has gone stale by the time it is picked up. Returns a future
that always completes
- * normally so {@link CompletableFuture#allOf} acts as a clean join barrier.
+ * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it if
the epoch has gone
+ * stale by the time it is picked up. Any {@link RuntimeException} thrown by
the startup is caught
+ * and logged here instead of being allowed to escape: this keeps one
misbehaving service from
+ * failing the {@link CompletableFuture#allOf} join barrier in {@link
#becomeLeader}, which would
+ * otherwise abort the whole transition before {@link
#markLeaderServicesReadyIfEpochCurrent} runs
+ * and leave the node stuck returning {@code CONFIG_NODE_LEADER_WARMING_UP}
forever. The returned
+ * future therefore always completes normally, so {@code allOf} acts as a
clean join barrier.
*/
private CompletableFuture<Void> startInParallelIfEpochCurrent(
- final long epoch, final Runnable startup) {
+ final long epoch, final LeaderServiceStartup startup) {
return CompletableFuture.runAsync(
() -> {
- if (isCurrentLeaderServicesEpoch(epoch)) {
+ if (!isCurrentLeaderServicesEpoch(epoch)) {
+ return;
+ }
+ try {
startup.run();
+ } catch (final Exception e) {
+ // Swallow and log so a single failed startup cannot stall leader
warm-up. The service
+ // stays unstarted, but the node still finishes warming up and
begins serving; the
+ // failure is observable through this error log.
+ LOGGER.error(
+ "Current ConfigNode(nodeId: {}, ip: {}) failed to start leader
service [{}], the"
+ + " node will still finish warming up; this service stays
unavailable until the"
+ + " next leadership transition.",
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+ currentNodeTEndPoint,
+ startup.name(),
+ e);
}
},
leaderServicesStartupPool);
@@ -687,4 +724,27 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
return Long.parseLong(endIndexString);
}
+
+ /**
+ * A single leader service startup paired with a human-readable name, so a
failure can be logged
+ * against the service that produced it (see {@link
#startInParallelIfEpochCurrent}).
+ */
+ private static class LeaderServiceStartup {
+
+ private final String name;
+ private final Runnable startup;
+
+ private LeaderServiceStartup(final String name, final Runnable startup) {
+ this.name = name;
+ this.startup = startup;
+ }
+
+ private String name() {
+ return name;
+ }
+
+ private void run() {
+ startup.run();
+ }
+ }
}