This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch fix-confignode-leader-warmup-startup-failure in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 83ac7942be6653de701a1afa4791fd50d32107d3 Author: Yongzao <[email protected]> AuthorDate: Wed Jun 10 15:37:54 2026 +0800 Catch per-startup failures during ConfigNode leader warm-up Within becomeLeader(), the parallel leader-service startups are joined with CompletableFuture.allOf(startups).join(). startInParallelIfEpochCurrent() ran startup.run() unguarded, so if any startup (CQ, pipe, subscription, metrics, clusterId, ...) threw a RuntimeException, its future completed exceptionally and join() rethrew it as a CompletionException out of becomeLeader(). That aborted the transition before startExecutor() and markLeaderServicesReadyIfEpochCurrent() ran, so leaderServicesReady never flipped to true and the node kept returning CONFIG_NODE_LEADER_WARMING_UP forever -- even though startInParallelIfEpochCurrent()'s Javadoc claimed the future "always completes normally". Make that claim true: each startup now catches and logs its own failure (tagged with the service name via a small LeaderServiceStartup holder) and never lets it escape. A single failing service stays unavailable until the next leadership transition, but the node still finishes warming up and begins serving, and the failure is observable through the error log. --- .../statemachine/ConfigRegionStateMachine.java | 112 ++++++++++++++++----- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 251db87dbdc..b3029e66028 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -336,7 +336,9 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev configManager.getLoadManager().startTopologyService(); } - // Start the remaining leader services in parallel and wait for all of them to finish. + // Start the remaining leader services in parallel and wait for all of them to finish. Each + // startup swallows and logs its own failure (see startInParallelIfEpochCurrent), so a single + // misbehaving service cannot abort the whole transition and leave the node stuck warming up. final CompletableFuture<?>[] startups = leaderServiceStartups().stream() .map(startup -> startInParallelIfEpochCurrent(epoch, startup)) @@ -364,31 +366,47 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev } /** The leader services that can be started independently, in parallel, within one epoch. */ - private List<Runnable> leaderServiceStartups() { + private List<LeaderServiceStartup> leaderServiceStartups() { return Arrays.asList( - () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(), - () -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService(), - () -> configManager.getPartitionManager().startRegionCleaner(), + new LeaderServiceStartup( + "ProcedureInfo.upgrade", + () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()), + new LeaderServiceStartup( + "RetryFailedTasksService", + () -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService()), + new LeaderServiceStartup( + "RegionCleaner", () -> configManager.getPartitionManager().startRegionCleaner()), // Add metrics after leader ready. - () -> configManager.addMetrics(), + new LeaderServiceStartup("Metrics", () -> configManager.addMetrics()), // Activate leader related service for config pipe. - () -> PipeConfigNodeAgent.runtime().notifyLeaderReady(), + new LeaderServiceStartup( + "PipeConfigNodeRuntime", () -> PipeConfigNodeAgent.runtime().notifyLeaderReady()), // CQ recovery may be time-consuming, so it is just one more parallel startup. - () -> configManager.getCQManager().startCQScheduler(), - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(), - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(), - () -> - configManager - .getPipeManager() - .getPipeRuntimeCoordinator() - .onConfigRegionGroupLeaderChanged(), - () -> - configManager - .getSubscriptionManager() - .getSubscriptionCoordinator() - .startSubscriptionMetaSync(), + new LeaderServiceStartup( + "CQScheduler", () -> configManager.getCQManager().startCQScheduler()), + new LeaderServiceStartup( + "PipeMetaSync", + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()), + new LeaderServiceStartup( + "PipeHeartbeat", + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()), + new LeaderServiceStartup( + "PipeOnLeaderChanged", + () -> + configManager + .getPipeManager() + .getPipeRuntimeCoordinator() + .onConfigRegionGroupLeaderChanged()), + new LeaderServiceStartup( + "SubscriptionMetaSync", + () -> + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .startSubscriptionMetaSync()), // To adapt old version, we check cluster ID after state machine has been fully recovered. - () -> configManager.getClusterManager().checkClusterId()); + new LeaderServiceStartup( + "CheckClusterId", () -> configManager.getClusterManager().checkClusterId())); } /** Tear down every leader service. Runs on the single transition thread. */ @@ -422,16 +440,35 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev } /** - * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it (and recording success) - * if the epoch has gone stale by the time it is picked up. Returns a future that always completes - * normally so {@link CompletableFuture#allOf} acts as a clean join barrier. + * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it if the epoch has gone + * stale by the time it is picked up. Any {@link RuntimeException} thrown by the startup is caught + * and logged here instead of being allowed to escape: this keeps one misbehaving service from + * failing the {@link CompletableFuture#allOf} join barrier in {@link #becomeLeader}, which would + * otherwise abort the whole transition before {@link #markLeaderServicesReadyIfEpochCurrent} runs + * and leave the node stuck returning {@code CONFIG_NODE_LEADER_WARMING_UP} forever. The returned + * future therefore always completes normally, so {@code allOf} acts as a clean join barrier. */ private CompletableFuture<Void> startInParallelIfEpochCurrent( - final long epoch, final Runnable startup) { + final long epoch, final LeaderServiceStartup startup) { return CompletableFuture.runAsync( () -> { - if (isCurrentLeaderServicesEpoch(epoch)) { + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + try { startup.run(); + } catch (final Exception e) { + // Swallow and log so a single failed startup cannot stall leader warm-up. The service + // stays unstarted, but the node still finishes warming up and begins serving; the + // failure is observable through this error log. + LOGGER.error( + "Current ConfigNode(nodeId: {}, ip: {}) failed to start leader service [{}], the" + + " node will still finish warming up; this service stays unavailable until the" + + " next leadership transition.", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint, + startup.name(), + e); } }, leaderServicesStartupPool); @@ -687,4 +724,27 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev } return Long.parseLong(endIndexString); } + + /** + * A single leader service startup paired with a human-readable name, so a failure can be logged + * against the service that produced it (see {@link #startInParallelIfEpochCurrent}). + */ + private static class LeaderServiceStartup { + + private final String name; + private final Runnable startup; + + private LeaderServiceStartup(final String name, final Runnable startup) { + this.name = name; + this.startup = startup; + } + + private String name() { + return name; + } + + private void run() { + startup.run(); + } + } }
