This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch 
fix-confignode-leader-warmup-startup-failure
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 83ac7942be6653de701a1afa4791fd50d32107d3
Author: Yongzao <[email protected]>
AuthorDate: Wed Jun 10 15:37:54 2026 +0800

    Catch per-startup failures during ConfigNode leader warm-up
    
    Within becomeLeader(), the parallel leader-service startups are joined with
    CompletableFuture.allOf(startups).join(). startInParallelIfEpochCurrent()
    ran startup.run() unguarded, so if any startup (CQ, pipe, subscription,
    metrics, clusterId, ...) threw a RuntimeException, its future completed
    exceptionally and join() rethrew it as a CompletionException out of
    becomeLeader(). That aborted the transition before startExecutor() and
    markLeaderServicesReadyIfEpochCurrent() ran, so leaderServicesReady never
    flipped to true and the node kept returning CONFIG_NODE_LEADER_WARMING_UP
    forever -- even though startInParallelIfEpochCurrent()'s Javadoc claimed the
    future "always completes normally".
    
    Make that claim true: each startup now catches and logs its own failure
    (tagged with the service name via a small LeaderServiceStartup holder) and
    never lets it escape. A single failing service stays unavailable until the
    next leadership transition, but the node still finishes warming up and 
begins
    serving, and the failure is observable through the error log.
---
 .../statemachine/ConfigRegionStateMachine.java     | 112 ++++++++++++++++-----
 1 file changed, 86 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 251db87dbdc..b3029e66028 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -336,7 +336,9 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
       configManager.getLoadManager().startTopologyService();
     }
 
-    // Start the remaining leader services in parallel and wait for all of 
them to finish.
+    // Start the remaining leader services in parallel and wait for all of 
them to finish. Each
+    // startup swallows and logs its own failure (see 
startInParallelIfEpochCurrent), so a single
+    // misbehaving service cannot abort the whole transition and leave the 
node stuck warming up.
     final CompletableFuture<?>[] startups =
         leaderServiceStartups().stream()
             .map(startup -> startInParallelIfEpochCurrent(epoch, startup))
@@ -364,31 +366,47 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
   }
 
   /** The leader services that can be started independently, in parallel, 
within one epoch. */
-  private List<Runnable> leaderServiceStartups() {
+  private List<LeaderServiceStartup> leaderServiceStartups() {
     return Arrays.asList(
-        () -> 
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(),
-        () -> 
configManager.getRetryFailedTasksThread().startRetryFailedTasksService(),
-        () -> configManager.getPartitionManager().startRegionCleaner(),
+        new LeaderServiceStartup(
+            "ProcedureInfo.upgrade",
+            () -> 
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()),
+        new LeaderServiceStartup(
+            "RetryFailedTasksService",
+            () -> 
configManager.getRetryFailedTasksThread().startRetryFailedTasksService()),
+        new LeaderServiceStartup(
+            "RegionCleaner", () -> 
configManager.getPartitionManager().startRegionCleaner()),
         // Add metrics after leader ready.
-        () -> configManager.addMetrics(),
+        new LeaderServiceStartup("Metrics", () -> configManager.addMetrics()),
         // Activate leader related service for config pipe.
-        () -> PipeConfigNodeAgent.runtime().notifyLeaderReady(),
+        new LeaderServiceStartup(
+            "PipeConfigNodeRuntime", () -> 
PipeConfigNodeAgent.runtime().notifyLeaderReady()),
         // CQ recovery may be time-consuming, so it is just one more parallel 
startup.
-        () -> configManager.getCQManager().startCQScheduler(),
-        () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(),
-        () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(),
-        () ->
-            configManager
-                .getPipeManager()
-                .getPipeRuntimeCoordinator()
-                .onConfigRegionGroupLeaderChanged(),
-        () ->
-            configManager
-                .getSubscriptionManager()
-                .getSubscriptionCoordinator()
-                .startSubscriptionMetaSync(),
+        new LeaderServiceStartup(
+            "CQScheduler", () -> 
configManager.getCQManager().startCQScheduler()),
+        new LeaderServiceStartup(
+            "PipeMetaSync",
+            () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()),
+        new LeaderServiceStartup(
+            "PipeHeartbeat",
+            () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()),
+        new LeaderServiceStartup(
+            "PipeOnLeaderChanged",
+            () ->
+                configManager
+                    .getPipeManager()
+                    .getPipeRuntimeCoordinator()
+                    .onConfigRegionGroupLeaderChanged()),
+        new LeaderServiceStartup(
+            "SubscriptionMetaSync",
+            () ->
+                configManager
+                    .getSubscriptionManager()
+                    .getSubscriptionCoordinator()
+                    .startSubscriptionMetaSync()),
         // To adapt old version, we check cluster ID after state machine has 
been fully recovered.
-        () -> configManager.getClusterManager().checkClusterId());
+        new LeaderServiceStartup(
+            "CheckClusterId", () -> 
configManager.getClusterManager().checkClusterId()));
   }
 
   /** Tear down every leader service. Runs on the single transition thread. */
@@ -422,16 +440,35 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
   }
 
   /**
-   * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it 
(and recording success)
-   * if the epoch has gone stale by the time it is picked up. Returns a future 
that always completes
-   * normally so {@link CompletableFuture#allOf} acts as a clean join barrier.
+   * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it if 
the epoch has gone
+   * stale by the time it is picked up. Any {@link RuntimeException} thrown by 
the startup is caught
+   * and logged here instead of being allowed to escape: this keeps one 
misbehaving service from
+   * failing the {@link CompletableFuture#allOf} join barrier in {@link 
#becomeLeader}, which would
+   * otherwise abort the whole transition before {@link 
#markLeaderServicesReadyIfEpochCurrent} runs
+   * and leave the node stuck returning {@code CONFIG_NODE_LEADER_WARMING_UP} 
forever. The returned
+   * future therefore always completes normally, so {@code allOf} acts as a 
clean join barrier.
    */
   private CompletableFuture<Void> startInParallelIfEpochCurrent(
-      final long epoch, final Runnable startup) {
+      final long epoch, final LeaderServiceStartup startup) {
     return CompletableFuture.runAsync(
         () -> {
-          if (isCurrentLeaderServicesEpoch(epoch)) {
+          if (!isCurrentLeaderServicesEpoch(epoch)) {
+            return;
+          }
+          try {
             startup.run();
+          } catch (final Exception e) {
+            // Swallow and log so a single failed startup cannot stall leader 
warm-up. The service
+            // stays unstarted, but the node still finishes warming up and 
begins serving; the
+            // failure is observable through this error log.
+            LOGGER.error(
+                "Current ConfigNode(nodeId: {}, ip: {}) failed to start leader 
service [{}], the"
+                    + " node will still finish warming up; this service stays 
unavailable until the"
+                    + " next leadership transition.",
+                ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+                currentNodeTEndPoint,
+                startup.name(),
+                e);
           }
         },
         leaderServicesStartupPool);
@@ -687,4 +724,27 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     }
     return Long.parseLong(endIndexString);
   }
+
+  /**
+   * A single leader service startup paired with a human-readable name, so a 
failure can be logged
+   * against the service that produced it (see {@link 
#startInParallelIfEpochCurrent}).
+   */
+  private static class LeaderServiceStartup {
+
+    private final String name;
+    private final Runnable startup;
+
+    private LeaderServiceStartup(final String name, final Runnable startup) {
+      this.name = name;
+      this.startup = startup;
+    }
+
+    private String name() {
+      return name;
+    }
+
+    private void run() {
+      startup.run();
+    }
+  }
 }

Reply via email to