This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ddd8faaff23 Improve ConfigNode leader warm-up before serving (#17821)
ddd8faaff23 is described below

commit ddd8faaff23956ac1f51c2e34c722d0db9f47f08
Author: Yongzao <[email protected]>
AuthorDate: Wed Jun 10 14:31:12 2026 +0800

    Improve ConfigNode leader warm-up before serving (#17821)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 iotdb-core/ainode/iotdb/ainode/core/constant.py    |   1 +
 iotdb-core/ainode/iotdb/ainode/core/rpc/client.py  |  11 +-
 .../handlers/heartbeat/AINodeHeartbeatHandler.java |   2 +-
 .../heartbeat/DataNodeHeartbeatHandler.java        | 125 +++++---
 .../statemachine/ConfigRegionStateMachine.java     | 332 +++++++++++++++------
 .../iotdb/confignode/manager/ConfigManager.java    |   4 +
 .../iotdb/confignode/manager/ProcedureManager.java |  13 +-
 .../manager/consensus/ConsensusManager.java        |  79 +++--
 .../iotdb/confignode/manager/load/LoadManager.java |  73 +++++
 .../manager/load/cache/AbstractLoadCache.java      |   4 +
 .../confignode/manager/load/cache/LoadCache.java   |  27 ++
 .../load/cache/consensus/ConsensusGroupCache.java  |   2 +-
 .../confignode/procedure/ProcedureExecutor.java    |  13 +-
 .../iotdb/confignode/service/ConfigNode.java       |   8 +-
 .../confignode/manager/load/LoadManagerTest.java   |  70 +++++
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  45 ++-
 .../iotdb/commons/concurrent/ThreadName.java       |   3 +-
 18 files changed, 648 insertions(+), 165 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d22e8f55207..a8e2ef8d12e 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -209,6 +209,7 @@ public enum TSStatusCode {
   CAN_NOT_CONNECT_AINODE(1011),
   NO_AVAILABLE_REPLICA(1012),
   NO_AVAILABLE_AINODE(1013),
+  CONFIG_NODE_LEADER_WARMING_UP(1014),
 
   // Sync, Load TsFile
   LOAD_FILE_ERROR(1100),
diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py 
b/iotdb-core/ainode/iotdb/ainode/core/constant.py
index 4a2ee543d1f..1eb79085bcf 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/constant.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py
@@ -80,6 +80,7 @@ DEFAULT_CHUNK_SIZE = 8192
 class TSStatusCode(Enum):
     SUCCESS_STATUS = 200
     REDIRECTION_RECOMMEND = 400
+    CONFIG_NODE_LEADER_WARMING_UP = 1014
     MODEL_EXISTED_ERROR = 1503
     MODEL_NOT_EXIST_ERROR = 1504
     CREATE_MODEL_ERROR = 1505
diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py 
b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
index ea6362ef080..81bb81d50bb 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
@@ -66,6 +66,7 @@ class ConfigNodeClient(object):
             "Fail to connect to any config node. Please check status of 
ConfigNodes"
         )
         self._RETRY_NUM = 10
+        self._STARTUP_RETRY_NUM = 60
         self._RETRY_INTERVAL_IN_S = 1
 
         self._try_to_connect()
@@ -163,6 +164,12 @@ class ConfigNodeClient(object):
             else:
                 self._config_leader = None
             return True
+        if status.code == 
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.get_status_code():
+            logger.info(
+                "ConfigNode leader is warming up before serving AINode, will 
wait and retry. Reason: {}",
+                status.message,
+            )
+            return True
         return False
 
     def node_register(
@@ -177,7 +184,7 @@ class ConfigNodeClient(object):
             versionInfo=version_info,
         )
 
-        for _ in range(0, self._RETRY_NUM):
+        for _ in range(0, self._STARTUP_RETRY_NUM):
             try:
                 resp = self._client.registerAINode(req)
                 if not self._update_config_node_leader(resp.status):
@@ -208,7 +215,7 @@ class ConfigNodeClient(object):
             versionInfo=version_info,
         )
 
-        for _ in range(0, self._RETRY_NUM):
+        for _ in range(0, self._STARTUP_RETRY_NUM):
             try:
                 resp = self._client.restartAINode(req)
                 if not self._update_config_node_leader(resp.status):
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
index 9d8e0b6e847..03e6c0bfe31 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
@@ -50,7 +50,7 @@ public class AINodeHeartbeatHandler implements 
AsyncMethodCallback<TAIHeartbeatR
   public void onError(Exception e) {
     if (ThriftClient.isConnectionBroken(e)) {
       loadManager.forceUpdateNodeCache(
-          NodeType.DataNode, nodeId, new 
NodeHeartbeatSample(NodeStatus.Unknown));
+          NodeType.AINode, nodeId, new 
NodeHeartbeatSample(NodeStatus.Unknown));
     }
     loadManager.getLoadCache().resetHeartbeatProcessing(nodeId);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index e7a31b1dc73..52053cdab45 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -36,6 +37,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -82,54 +84,86 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
 
   @Override
   public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
-    // Update NodeCache
+    cacheNodeHeartbeatSample(heartbeatResp);
+    cacheRegionGroupHeartbeatSamples(heartbeatResp);
+    cacheUsageSamples(heartbeatResp);
+    cachePipeHeartbeat(heartbeatResp);
+    cacheConfirmedConfigNodeEndPoints(heartbeatResp);
+    cacheRegionSizeSamples(heartbeatResp);
+  }
+
+  private void cacheNodeHeartbeatSample(TDataNodeHeartbeatResp heartbeatResp) {
     loadManager
         .getLoadCache()
         .cacheDataNodeHeartbeatSample(nodeId, new 
NodeHeartbeatSample(heartbeatResp));
+  }
 
+  private void cacheRegionGroupHeartbeatSamples(TDataNodeHeartbeatResp 
heartbeatResp) {
     RegionStatus regionStatus = 
RegionStatus.valueOf(heartbeatResp.getStatus());
 
-    heartbeatResp
-        .getJudgedLeaders()
-        .forEach(
-            (regionGroupId, isLeader) -> {
-
-              // Do not allow regions to inherit the Removing state from 
datanode
-              RegionStatus nextRegionStatus = regionStatus;
-              if (nextRegionStatus == RegionStatus.Removing) {
-                nextRegionStatus =
-                    loadManager
-                        .getLoadCache()
-                        .getRegionCacheLastSampleStatus(regionGroupId, nodeId);
-              }
-
-              // Update RegionGroupCache
-              loadManager
-                  .getLoadCache()
-                  .cacheRegionHeartbeatSample(
-                      regionGroupId,
-                      nodeId,
-                      new RegionHeartbeatSample(
-                          heartbeatResp.getHeartbeatTimestamp(),
-                          // Region will inherit DataNode's status
-                          nextRegionStatus),
-                      false);
-
-              if 
(((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
-                          && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
-                      || 
(TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
-                          && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE))
-                  && Boolean.TRUE.equals(isLeader)) {
-                // Update ConsensusGroupCache when necessary
-                loadManager
-                    .getLoadCache()
-                    .cacheConsensusSample(
-                        regionGroupId,
-                        new ConsensusGroupHeartbeatSample(
-                            
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId));
-              }
-            });
+    Map<TConsensusGroupId, Boolean> judgedLeaders =
+        heartbeatResp.isSetJudgedLeaders()
+            ? heartbeatResp.getJudgedLeaders()
+            : Collections.emptyMap();
+    judgedLeaders.forEach(
+        (regionGroupId, isLeader) -> {
+          cacheRegionHeartbeatSample(heartbeatResp, regionStatus, 
regionGroupId);
+          cacheConsensusSampleIfNeeded(heartbeatResp, regionGroupId, isLeader);
+        });
+  }
+
+  private void cacheRegionHeartbeatSample(
+      TDataNodeHeartbeatResp heartbeatResp,
+      RegionStatus dataNodeRegionStatus,
+      TConsensusGroupId regionGroupId) {
+    loadManager
+        .getLoadCache()
+        .cacheRegionHeartbeatSample(
+            regionGroupId,
+            nodeId,
+            new RegionHeartbeatSample(
+                heartbeatResp.getHeartbeatTimestamp(),
+                getRegionHeartbeatStatus(regionGroupId, dataNodeRegionStatus)),
+            false);
+  }
+
+  private RegionStatus getRegionHeartbeatStatus(
+      TConsensusGroupId regionGroupId, RegionStatus dataNodeRegionStatus) {
+    return dataNodeRegionStatus == RegionStatus.Removing
+        ? 
loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId)
+        : dataNodeRegionStatus;
+  }
+
+  private void cacheConsensusSampleIfNeeded(
+      TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId, 
Boolean isLeader) {
+    if (!Boolean.TRUE.equals(isLeader)
+        || !shouldCacheConsensusSample(regionGroupId)
+        || !hasConsensusLogicalTimestamp(heartbeatResp, regionGroupId)) {
+      return;
+    }
+
+    loadManager
+        .getLoadCache()
+        .cacheConsensusSample(
+            regionGroupId,
+            new ConsensusGroupHeartbeatSample(
+                heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), 
nodeId));
+  }
+
+  private boolean shouldCacheConsensusSample(TConsensusGroupId regionGroupId) {
+    return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
+            && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
+        || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
+            && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE);
+  }
+
+  private boolean hasConsensusLogicalTimestamp(
+      TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId) {
+    return heartbeatResp.isSetConsensusLogicalTimeMap()
+        && 
heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId);
+  }
 
+  private void cacheUsageSamples(TDataNodeHeartbeatResp heartbeatResp) {
     if (heartbeatResp.getRegionDeviceUsageMap() != null) {
       deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap());
       deviceUsageRespProcess.accept(heartbeatResp.getRegionDeviceUsageMap());
@@ -141,6 +175,9 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
     if (heartbeatResp.getRegionDisk() != null) {
       regionDisk.putAll(heartbeatResp.getRegionDisk());
     }
+  }
+
+  private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) {
     if (heartbeatResp.getPipeMetaList() != null) {
       pipeRuntimeCoordinator.parseHeartbeat(
           nodeId,
@@ -149,12 +186,18 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
           heartbeatResp.getPipeRemainingEventCountList(),
           heartbeatResp.getPipeRemainingTimeList());
     }
+  }
+
+  private void cacheConfirmedConfigNodeEndPoints(TDataNodeHeartbeatResp 
heartbeatResp) {
     if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
       loadManager
           .getLoadCache()
           .updateConfirmedConfigNodeEndPoints(
               nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints());
     }
+  }
+
+  private void cacheRegionSizeSamples(TDataNodeHeartbeatResp heartbeatResp) {
     if (heartbeatResp.isSetRegionDisk()) {
       loadManager.getLoadCache().updateRegionSizeMap(nodeId, 
heartbeatResp.getRegionDisk());
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index fe687d17556..251db87dbdc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -60,22 +60,53 @@ import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.Optional;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** {@link IStateMachine} for ConfigRegion. */
 public class ConfigRegionStateMachine implements IStateMachine, 
IStateMachine.EventApi {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigRegionStateMachine.class);
 
-  private static final ExecutorService threadPool =
+  /**
+   * Serializes leadership transitions (become-leader / resign-leader). A 
single worker thread is
+   * the barrier that keeps epochs strictly serial: the orchestration of one 
transition runs to
+   * completion before the next one begins.
+   */
+  private static final ExecutorService leaderServicesTransitionExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.CONFIG_NODE_LEADER_SERVICES_TRANSITION.getName());
+
+  /** Runs the individual leader services in parallel within a single 
become-leader epoch. */
+  private static final ExecutorService leaderServicesStartupPool =
       
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.CONFIG_NODE_RECOVER.getName());
+
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final long WAIT_LOAD_READY_TIMEOUT_MS =
+      CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS() 
/ 2;
+  private static final long WAIT_LOAD_READY_INTERVAL_MS = 100;
   private final ConfigPlanExecutor executor;
+
+  /**
+   * Whether the leader services of the {@link #leaderServicesEpoch current 
epoch} have finished
+   * starting up. Read by {@link ConsensusManager#confirmLeader()} to gate 
external serving.
+   */
+  private final AtomicBoolean leaderServicesReady;
+
+  /**
+   * Monotonically increasing leadership generation. Every become-leader / 
resign-leader transition
+   * bumps it, so any work submitted for an older epoch can detect it is stale 
and bail out.
+   */
+  private final AtomicLong leaderServicesEpoch;
+
+  /** Guards {@link #leaderServicesReady} and {@link #leaderServicesEpoch} as 
a unit. */
+  private final Object leaderServicesLock;
+
   private ConfigManager configManager;
 
   /** Variables for {@link ConfigNode} Simple Consensus. */
@@ -87,17 +118,20 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
 
   private static final String CURRENT_FILE_DIR =
       ConsensusManager.getConfigRegionDir() + File.separator + "current";
+  private static final String LOG_INPROGRESS_FILE_PREFIX = "log_inprogress_";
+  private static final String LOG_FILE_PREFIX = "log_";
   private static final String PROGRESS_FILE_PATH =
-      CURRENT_FILE_DIR + File.separator + "log_inprogress_";
-  private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + 
"log_";
+      CURRENT_FILE_DIR + File.separator + LOG_INPROGRESS_FILE_PREFIX;
+  private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + 
LOG_FILE_PREFIX;
   private static final long LOG_FILE_MAX_SIZE =
       CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
   private final TEndPoint currentNodeTEndPoint;
-  private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
-  private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");
 
   public ConfigRegionStateMachine(ConfigManager configManager, 
ConfigPlanExecutor executor) {
     this.executor = executor;
+    this.leaderServicesReady = new AtomicBoolean(false);
+    this.leaderServicesEpoch = new AtomicLong(0);
+    this.leaderServicesLock = new Object();
     this.configManager = configManager;
     this.currentNodeTEndPoint =
         new TEndPoint()
@@ -115,9 +149,9 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
 
   @Override
   public TSStatus write(IConsensusRequest request) {
-    return Optional.ofNullable(request)
-        .map(o -> write((ConfigPhysicalPlan) request))
-        .orElseGet(() -> new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+    return request == null
+        ? new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+        : write((ConfigPhysicalPlan) request);
   }
 
   /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
@@ -233,7 +267,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
   public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
     // We get currentNodeId here because the currentNodeId
     // couldn't initialize earlier than the ConfigRegionStateMachine
-    int currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+    final int currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
     if (currentNodeId != newLeaderId) {
       LOGGER.info(
           
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
@@ -241,6 +275,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
           currentNodeId,
           currentNodeTEndPoint,
           newLeaderId);
+      resignLeaderAsync();
     }
   }
 
@@ -248,12 +283,117 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
   public void notifyNotLeader() {
     // We get currentNodeId here because the currentNodeId
     // couldn't initialize earlier than the ConfigRegionStateMachine
-    int currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+    final int currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
     LOGGER.info(
         ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
             + "start cleaning up related services",
         currentNodeId,
         currentNodeTEndPoint);
+    resignLeaderAsync();
+  }
+
+  @Override
+  public void notifyLeaderReady() {
+    LOGGER.info(
+        
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER,
+        ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+        currentNodeTEndPoint);
+    // Bump the epoch eagerly so that any in-flight services of an older epoch 
are invalidated
+    // immediately, even before the (serialized) become-leader orchestration 
gets to run.
+    final long epoch = nextLeaderServicesEpoch();
+    leaderServicesTransitionExecutor.submit(() -> becomeLeader(epoch));
+  }
+
+  /**
+   * Submit a resign-leader transition. The epoch is bumped eagerly (on the 
consensus thread) so
+   * that stale leader work is invalidated at once, while the teardown itself 
is serialized behind
+   * any in-flight transition on {@link #leaderServicesTransitionExecutor}.
+   */
+  private void resignLeaderAsync() {
+    invalidateLeaderServices();
+    leaderServicesTransitionExecutor.submit(this::stopLeaderServices);
+  }
+
+  /**
+   * Bring up the leader services for {@code epoch}. Runs on the single 
transition thread, so it is
+   * strictly serialized against every other transition. Within the epoch, the 
load services start
+   * first (to warm up as early as possible), then the remaining services 
start in parallel and are
+   * joined before the epoch is marked ready.
+   */
+  private void becomeLeader(final long epoch) {
+    if (!isCurrentLeaderServicesEpoch(epoch)) {
+      LOGGER.info(
+          
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
+              + "skip starting leader services because the leader epoch is 
stale",
+          ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+          currentNodeTEndPoint);
+      return;
+    }
+
+    // Always start load services first. ConsensusManager gates external 
serving until warm-up.
+    configManager.getLoadManager().startLoadServices();
+    if (CONF.isEnableTopologyProbing()) {
+      configManager.getLoadManager().startTopologyService();
+    }
+
+    // Start the remaining leader services in parallel and wait for all of 
them to finish.
+    final CompletableFuture<?>[] startups =
+        leaderServiceStartups().stream()
+            .map(startup -> startInParallelIfEpochCurrent(epoch, startup))
+            .toArray(CompletableFuture[]::new);
+    CompletableFuture.allOf(startups).join();
+
+    if (!isCurrentLeaderServicesEpoch(epoch)) {
+      return;
+    }
+    // The procedure executor may report readiness asynchronously once it has 
caught up.
+    configManager
+        .getProcedureManager()
+        .startExecutor(() -> markLeaderServicesReadyIfEpochCurrent(epoch));
+    markLeaderServicesReadyIfEpochCurrent(epoch);
+
+    final boolean loadReady = waitForLoadReady(epoch);
+    if (!isCurrentLeaderServicesEpoch(epoch)) {
+      return;
+    }
+    logLoadWarmUpIfNeeded(loadReady);
+    LOGGER.info(
+        
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS,
+        ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+        currentNodeTEndPoint);
+  }
+
+  /** The leader services that can be started independently, in parallel, 
within one epoch. */
+  private List<Runnable> leaderServiceStartups() {
+    return Arrays.asList(
+        () -> 
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(),
+        () -> 
configManager.getRetryFailedTasksThread().startRetryFailedTasksService(),
+        () -> configManager.getPartitionManager().startRegionCleaner(),
+        // Add metrics after leader ready.
+        () -> configManager.addMetrics(),
+        // Activate leader related service for config pipe.
+        () -> PipeConfigNodeAgent.runtime().notifyLeaderReady(),
+        // CQ recovery may be time-consuming, so it is just one more parallel 
startup.
+        () -> configManager.getCQManager().startCQScheduler(),
+        () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(),
+        () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(),
+        () ->
+            configManager
+                .getPipeManager()
+                .getPipeRuntimeCoordinator()
+                .onConfigRegionGroupLeaderChanged(),
+        () ->
+            configManager
+                .getSubscriptionManager()
+                .getSubscriptionCoordinator()
+                .startSubscriptionMetaSync(),
+        // To adapt old version, we check cluster ID after state machine has 
been fully recovered.
+        () -> configManager.getClusterManager().checkClusterId());
+  }
+
+  /** Tear down every leader service. Runs on the single transition thread. */
+  private void stopLeaderServices() {
+    final int currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
     // Stop leader scheduling services
     
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
     
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
@@ -281,63 +421,89 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
         currentNodeTEndPoint);
   }
 
-  @Override
-  public void notifyLeaderReady() {
-    LOGGER.info(
-        
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER,
-        ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-        currentNodeTEndPoint);
+  /**
+   * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it 
(and recording success)
+   * if the epoch has gone stale by the time it is picked up. Returns a future 
that always completes
+   * normally so {@link CompletableFuture#allOf} acts as a clean join barrier.
+   */
+  private CompletableFuture<Void> startInParallelIfEpochCurrent(
+      final long epoch, final Runnable startup) {
+    return CompletableFuture.runAsync(
+        () -> {
+          if (isCurrentLeaderServicesEpoch(epoch)) {
+            startup.run();
+          }
+        },
+        leaderServicesStartupPool);
+  }
 
-    // Always start load services first
-    configManager.getLoadManager().startLoadServices();
+  private void markLeaderServicesReadyIfEpochCurrent(final long epoch) {
+    synchronized (leaderServicesLock) {
+      if (isCurrentLeaderServicesEpoch(epoch)) {
+        leaderServicesReady.set(true);
+      }
+    }
+  }
 
-    if (CONF.isEnableTopologyProbing()) {
-      configManager.getLoadManager().startTopologyService();
+  private void logLoadWarmUpIfNeeded(final boolean loadReady) {
+    if (!loadReady) {
+      LOGGER.info(
+          "Current ConfigNode(nodeId: {}, ip: {}) finished starting leader 
services while load"
+              + " warm-up is still in progress: {}",
+          ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+          currentNodeTEndPoint,
+          configManager.getLoadManager().getLoadReadyReason());
     }
+  }
 
-    // Start leader scheduling services
-    configManager.getProcedureManager().startExecutor();
-    threadPool.submit(
-        () -> 
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade());
-    configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
-    configManager.getPartitionManager().startRegionCleaner();
-    // Add Metric after leader ready
-    configManager.addMetrics();
-
-    // Activate leader related service for config pipe
-    PipeConfigNodeAgent.runtime().notifyLeaderReady();
-
-    // we do cq recovery async for performance:
-    // cq recovery may be time-consuming, we use another thread to do it in
-    // make notifyLeaderChanged not blocked by it
-    threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
-
-    threadPool.submit(
-        () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
-    threadPool.submit(
-        () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
-    threadPool.submit(
-        () ->
-            configManager
-                .getPipeManager()
-                .getPipeRuntimeCoordinator()
-                .onConfigRegionGroupLeaderChanged());
+  private boolean waitForLoadReady(final long epoch) {
+    long startTime = System.currentTimeMillis();
+    while (isCurrentLeaderServicesEpoch(epoch)
+        && System.currentTimeMillis() - startTime < 
WAIT_LOAD_READY_TIMEOUT_MS) {
+      if (configManager.getLoadManager().isLoadReady()) {
+        return true;
+      }
+      if (!sleepForLoadReady()) {
+        return false;
+      }
+    }
+    return isCurrentLeaderServicesEpoch(epoch) && 
configManager.getLoadManager().isLoadReady();
+  }
 
-    threadPool.submit(
-        () ->
-            configManager
-                .getSubscriptionManager()
-                .getSubscriptionCoordinator()
-                .startSubscriptionMetaSync());
+  private boolean sleepForLoadReady() {
+    try {
+      TimeUnit.MILLISECONDS.sleep(WAIT_LOAD_READY_INTERVAL_MS);
+      return true;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Unexpected interruption while waiting for ConfigNode leader 
load warm-up.", e);
+      return false;
+    }
+  }
 
-    // To adapt old version, we check cluster ID after state machine has been 
fully recovered.
-    // Do check async because sync will be slow and block every other things.
-    threadPool.submit(() -> 
configManager.getClusterManager().checkClusterId());
+  public boolean areLeaderServicesReady() {
+    return leaderServicesReady.get();
+  }
 
-    LOGGER.info(
-        
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS,
-        ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-        currentNodeTEndPoint);
+  /** Open a new leadership generation, invalidating the previous one. */
+  private long nextLeaderServicesEpoch() {
+    synchronized (leaderServicesLock) {
+      leaderServicesReady.set(false);
+      return leaderServicesEpoch.incrementAndGet();
+    }
+  }
+
+  /** Invalidate the current leadership generation without opening a serving 
one. */
+  private void invalidateLeaderServices() {
+    synchronized (leaderServicesLock) {
+      leaderServicesReady.set(false);
+      leaderServicesEpoch.incrementAndGet();
+    }
+  }
+
+  private boolean isCurrentLeaderServicesEpoch(final long epoch) {
+    return leaderServicesEpoch.get() == epoch
+        && configManager.getConsensusManager().isLeaderReady();
   }
 
   @Override
@@ -413,7 +579,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     dir.mkdirs();
     String[] list = new File(CURRENT_FILE_DIR).list();
     if (list != null && list.length != 0) {
-      Arrays.sort(list, new FileComparator());
+      Arrays.sort(list, 
Comparator.comparingLong(ConfigRegionStateMachine::parseEndIndex));
       for (String logFileName : list) {
         File logFile =
             SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + 
File.separator + logFileName);
@@ -497,28 +663,28 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     }
   }
 
-  static class FileComparator implements Comparator<String> {
-
-    @Override
-    public int compare(String filename1, String filename2) {
-      long id1 = parseEndIndex(filename1);
-      long id2 = parseEndIndex(filename2);
-      return Long.compare(id1, id2);
-    }
-  }
-
-  static long parseEndIndex(String filename) {
-    if (filename.startsWith("log_inprogress_")) {
-      Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename);
-      if (matcher.find()) {
-        return Long.parseLong(matcher.group());
+  private static long parseEndIndex(String filename) {
+    final String endIndexString;
+    if (filename.startsWith(LOG_INPROGRESS_FILE_PREFIX)) {
+      endIndexString = filename.substring(LOG_INPROGRESS_FILE_PREFIX.length());
+    } else if (filename.startsWith(LOG_FILE_PREFIX)) {
+      final int lastSeparatorIndex = filename.lastIndexOf('_');
+      if (lastSeparatorIndex < LOG_FILE_PREFIX.length()) {
+        return 0;
       }
+      endIndexString = filename.substring(lastSeparatorIndex + 1);
     } else {
-      Matcher matcher = LOG_PATTERN.matcher(filename);
-      if (matcher.find()) {
-        return Long.parseLong(matcher.group());
+      return 0;
+    }
+
+    if (endIndexString.isEmpty()) {
+      return 0;
+    }
+    for (int i = 0; i < endIndexString.length(); i++) {
+      if (!Character.isDigit(endIndexString.charAt(i))) {
+        return 0;
       }
     }
-    return 0;
+    return Long.parseLong(endIndexString);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 64ee0c54791..c5bafc550f6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1256,6 +1256,10 @@ public class ConfigManager implements IManager {
               "ConsensusManager of target-ConfigNode is not initialized, "
                   + "please make sure the target-ConfigNode has been started 
successfully.");
     }
+    // Procedure recovery replays metadata writes before external load warm-up 
is complete.
+    if (procedureManager.isProcedureExecutionThread()) {
+      return getConsensusManager().confirmLeaderForInternalProcedure();
+    }
     return getConsensusManager().confirmLeader();
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index a974f9e1d7c..cb76a9550af 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -228,14 +228,21 @@ public class ProcedureManager {
   }
 
   public void startExecutor() {
+    startExecutor(null);
+  }
+
+  public void startExecutor(final Runnable beforeStartingWorkers) {
     if (!executor.isRunning()) {
       executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount());
-      executor.startWorkers();
       executor.startCompletedCleaner(
           CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
           CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
       executor.addInternalProcedure(partitionTableCleaner);
       store.start();
+      if (beforeStartingWorkers != null) {
+        beforeStartingWorkers.run();
+      }
+      executor.startWorkers();
       LOGGER.info(ManagerMessages.PROCEDUREMANAGER_IS_STARTED_SUCCESSFULLY);
     }
   }
@@ -252,6 +259,10 @@ public class ProcedureManager {
     }
   }
 
+  public boolean isProcedureExecutionThread() {
+    return ProcedureExecutor.isProcedureExecutionThread();
+  }
+
   @TestOnly
   public TSStatus createManyDatabases() {
     this.executor.submitProcedure(new CreateManyDatabasesProcedure());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 84594b0d7a8..d7024c357b1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -81,12 +81,14 @@ public class ConsensusManager {
       new ConfigRegionId(CONF.getConfigRegionId());
 
   private final IManager configManager;
+  private final ConfigRegionStateMachine stateMachine;
   private IConsensus consensusImpl;
 
   private boolean isInitialized;
 
   public ConsensusManager(IManager configManager, ConfigRegionStateMachine 
stateMachine) {
     this.configManager = configManager;
+    this.stateMachine = stateMachine;
     setConsensusLayer(stateMachine);
   }
 
@@ -94,6 +96,7 @@ public class ConsensusManager {
   ConsensusManager(IManager configManager, IConsensus consensusImpl) {
     this.configManager = configManager;
     this.consensusImpl = consensusImpl;
+    this.stateMachine = null;
   }
 
   public void start() throws IOException {
@@ -445,39 +448,59 @@ public class ConsensusManager {
    *     NEED_REDIRECTION otherwise
    */
   public TSStatus confirmLeader() {
-    TSStatus result = new TSStatus();
-    if (isLeaderReady()) {
-      result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } else {
-      result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
-      if (isLeader()) {
-        long startTime = System.currentTimeMillis();
-        while (System.currentTimeMillis() - startTime < 
MAX_WAIT_READY_TIME_MS) {
-          if (isLeaderReady()) {
-            result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-            return result;
-          }
-          try {
-            Thread.sleep(RETRY_WAIT_TIME_MS);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOGGER.warn(
-                
ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY);
-            break;
-          }
-        }
-        result.setMessage(
-            "The current ConfigNode is leader but not ready yet, please try 
again later.");
-      } else {
-        result.setMessage(
-            "The current ConfigNode is not leader, please redirect to a new 
ConfigNode.");
-      }
+    return confirmLeader(true);
+  }
+
+  private TSStatus confirmLeader(final boolean checkLoadReady) {
+    if (!isLeader()) {
+      TSStatus result = new 
TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+      result.setMessage(
+          "The current ConfigNode is not leader, please redirect to a new 
ConfigNode.");
       TConfigNodeLocation leaderLocation = getLeaderLocation();
       if (leaderLocation != null) {
         result.setRedirectNode(leaderLocation.getInternalEndPoint());
       }
+      return result;
+    }
+
+    waitForLeaderReady();
+
+    if (!isLeaderReady()) {
+      return getLeaderWarmingUpStatus(
+          "The current ConfigNode is leader but consensus is not ready yet.");
+    }
+    if (!stateMachine.areLeaderServicesReady()) {
+      return getLeaderWarmingUpStatus(
+          "The current ConfigNode is leader but leader services are not ready 
yet.");
+    }
+    if (checkLoadReady && !configManager.getLoadManager().isLoadReady()) {
+      return 
getLeaderWarmingUpStatus(configManager.getLoadManager().getLoadReadyReason());
+    }
+
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  public TSStatus confirmLeaderForInternalProcedure() {
+    return confirmLeader(false);
+  }
+
+  private void waitForLeaderReady() {
+    long startTime = System.currentTimeMillis();
+    while (!isLeaderReady() && System.currentTimeMillis() - startTime < 
MAX_WAIT_READY_TIME_MS) {
+      try {
+        Thread.sleep(RETRY_WAIT_TIME_MS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn(
+            
ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY);
+        return;
+      }
     }
-    return result;
+  }
+
+  private TSStatus getLeaderWarmingUpStatus(String message) {
+    return new 
TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode())
+        .setMessage(message);
   }
 
   public ConsensusGroupId getConsensusGroupId() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index e0547bf640d..63d5ff8befd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -50,7 +50,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -59,6 +62,8 @@ import java.util.function.Function;
  */
 public class LoadManager {
 
+  private static final long FIRST_HEARTBEAT_READY_TOLERANCE_MS = 
TimeUnit.SECONDS.toMillis(30);
+
   protected final IManager configManager;
 
   /** Balancers. */
@@ -74,6 +79,10 @@ public class LoadManager {
   private final StatisticsService statisticsService;
   private final EventService eventService;
   private final TopologyService topologyService;
+  private final AtomicBoolean loadServicesStarted;
+  private final AtomicLong loadReadyStartTimeMillis;
+  private final AtomicBoolean loadReady;
+  private volatile String loadReadyReason;
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
@@ -92,6 +101,10 @@ public class LoadManager {
         
configManager.getSubscriptionManager().getSubscriptionLeaderChangeHandler());
     this.eventService.register(routeBalancer);
     this.eventService.register(topologyService);
+    this.loadServicesStarted = new AtomicBoolean(false);
+    this.loadReadyStartTimeMillis = new AtomicLong(0);
+    this.loadReady = new AtomicBoolean(false);
+    this.loadReadyReason = "ConfigNode leader load services are not started.";
   }
 
   protected void setHeartbeatService(IManager configManager, LoadCache 
loadCache) {
@@ -149,7 +162,11 @@ public class LoadManager {
   }
 
   public void startLoadServices() {
+    loadReady.set(false);
+    loadReadyStartTimeMillis.set(System.currentTimeMillis());
+    loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat 
sampling.";
     loadCache.initHeartbeatCache(configManager);
+    loadServicesStarted.set(true);
     heartbeatService.startHeartbeatService();
     statisticsService.startLoadStatisticsService();
     eventService.startEventService();
@@ -157,6 +174,10 @@ public class LoadManager {
   }
 
   public void stopLoadServices() {
+    loadServicesStarted.set(false);
+    loadReadyStartTimeMillis.set(0);
+    loadReady.set(false);
+    loadReadyReason = "ConfigNode leader load services are stopped.";
     heartbeatService.stopHeartbeatService();
     statisticsService.stopLoadStatisticsService();
     eventService.stopEventService();
@@ -165,6 +186,50 @@ public class LoadManager {
     routeBalancer.clearRegionPriority();
   }
 
+  public boolean isLoadReady() {
+    return loadReady.get() || tryUpdateLoadReady();
+  }
+
+  public String getLoadReadyReason() {
+    return loadReadyReason;
+  }
+
+  private synchronized boolean tryUpdateLoadReady() {
+    if (loadReady.get()) {
+      return true;
+    }
+    if (!loadServicesStarted.get()) {
+      loadReadyReason = "ConfigNode leader load services are not started.";
+      return false;
+    }
+
+    loadCache.updateNodeStatistics(false);
+    eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
+
+    List<String> unreadyReasons = loadCache.getNodeHeartbeatUnreadyReasons();
+    if (unreadyReasons.isEmpty()) {
+      loadReadyReason = "ConfigNode leader load services are ready.";
+      loadReady.set(true);
+      return true;
+    }
+
+    long elapsedMillis = System.currentTimeMillis() - 
loadReadyStartTimeMillis.get();
+    if (elapsedMillis < FIRST_HEARTBEAT_READY_TOLERANCE_MS) {
+      loadReadyReason =
+          "ConfigNode leader is waiting for first heartbeat from registered 
ConfigNodes/DataNodes: "
+              + unreadyReasons;
+      return false;
+    }
+
+    loadReadyReason =
+        "ConfigNode leader load services are ready after waiting "
+            + FIRST_HEARTBEAT_READY_TOLERANCE_MS
+            + "ms for first heartbeat. Missing heartbeats: "
+            + unreadyReasons;
+    loadReady.set(true);
+    return true;
+  }
+
   public void startTopologyService() {
     topologyService.startTopologyService();
   }
@@ -489,6 +554,14 @@ public class LoadManager {
     return routeBalancer;
   }
 
+  @TestOnly
+  void markLoadServicesStartedForTest(long loadReadyStartTimeMillis) {
+    loadServicesStarted.set(true);
+    loadReady.set(false);
+    this.loadReadyStartTimeMillis.set(loadReadyStartTimeMillis);
+    loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat 
sampling.";
+  }
+
   @TestOnly
   public EventService getEventService() {
     return eventService;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index d61a0043520..e5ab6445f98 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -97,6 +97,10 @@ public abstract class AbstractLoadCache {
     return slidingWindow.isEmpty() ? null : 
slidingWindow.get(slidingWindow.size() - 1);
   }
 
+  public boolean hasHeartbeatSample() {
+    return getLastSample() != null;
+  }
+
   /**
    * Update currentStatistics based on the latest heartbeat sample that cached 
in the slidingWindow.
    */
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 818171c89bc..3416246811e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -82,6 +82,7 @@ public class LoadCache {
       Math.max(
           ProcedureManager.PROCEDURE_WAIT_TIME_OUT - 
TimeUnit.SECONDS.toMillis(2),
           TimeUnit.SECONDS.toMillis(10));
+  private static final int MAX_UNREADY_ENTITY_PRINT = 10;
 
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
 
@@ -496,6 +497,32 @@ public class LoadCache {
     return consensusGroupStatisticsMap;
   }
 
+  public List<String> getNodeHeartbeatUnreadyReasons() {
+    List<Integer> unreadyNodes = new ArrayList<>();
+    nodeCacheMap.forEach(
+        (nodeId, nodeCache) -> {
+          if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
+            return;
+          }
+          if ((nodeCache instanceof ConfigNodeHeartbeatCache
+                  || nodeCache instanceof DataNodeHeartbeatCache)
+              && !nodeCache.hasHeartbeatSample()) {
+            unreadyNodes.add(nodeId);
+          }
+        });
+    if (unreadyNodes.isEmpty()) {
+      return Collections.emptyList();
+    }
+    Collections.sort(unreadyNodes);
+    List<Integer> nodesToPrint =
+        unreadyNodes.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, 
unreadyNodes.size()));
+    String suffix =
+        unreadyNodes.size() > MAX_UNREADY_ENTITY_PRINT
+            ? "...(" + (unreadyNodes.size() - MAX_UNREADY_ENTITY_PRINT) + " 
more)"
+            : "";
+    return Collections.singletonList("nodes=" + nodesToPrint + suffix);
+  }
+
   /**
    * Safely get NodeStatus by NodeId.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
index aa924dc29b5..7dcacc4fd80 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
@@ -41,7 +41,7 @@ public class ConsensusGroupCache extends AbstractLoadCache {
     synchronized (slidingWindow) {
       lastSample = (ConsensusGroupHeartbeatSample) getLastSample();
     }
-    if (lastSample != null && lastSample.getLeaderId() != UN_READY_LEADER_ID) {
+    if (lastSample != null) {
       currentStatistics.set(
           new ConsensusGroupStatistics(System.nanoTime(), 
lastSample.getLeaderId()));
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 11bb7f382d4..82afea3859f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -56,6 +56,8 @@ import static 
org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
 
 public class ProcedureExecutor<Env> {
   private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureExecutor.class);
+  private static final ThreadLocal<Boolean> PROCEDURE_EXECUTION_CONTEXT =
+      ThreadLocal.withInitial(() -> false);
 
   private final ConcurrentHashMap<Long, CompletedProcedureContainer<Env>> 
completed =
       new ConcurrentHashMap<>();
@@ -96,6 +98,10 @@ public class ProcedureExecutor<Env> {
     this(environment, store, new SimpleProcedureScheduler());
   }
 
+  public static boolean isProcedureExecutionThread() {
+    return PROCEDURE_EXECUTION_CONTEXT.get();
+  }
+
   public void init(int numThreads) {
     this.corePoolSize = numThreads;
     this.maxPoolSize = 10 * numThreads;
@@ -784,7 +790,12 @@ public class ProcedureExecutor<Env> {
           this.activeProcedure.set(procedure);
           activeExecutorCount.incrementAndGet();
           startTime.set(System.currentTimeMillis());
-          executeProcedure(procedure);
+          PROCEDURE_EXECUTION_CONTEXT.set(true);
+          try {
+            executeProcedure(procedure);
+          } finally {
+            PROCEDURE_EXECUTION_CONTEXT.remove();
+          }
           activeExecutorCount.decrementAndGet();
           LOG.trace(
               "Halt pid={}, activeCount={}", procedure.getProcId(), 
activeExecutorCount.get());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 037f138286a..da766541786 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -89,7 +89,7 @@ public class ConfigNode extends ServerCommandLine implements 
ConfigNodeMBean {
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
-  private static final int STARTUP_RETRY_NUM = 10;
+  private static final int STARTUP_RETRY_NUM = 20;
   private static final long STARTUP_RETRY_INTERVAL_IN_MS = 
TimeUnit.SECONDS.toMillis(3);
   private static final int SCHEDULE_WAITING_RETRY_NUM =
       (int) (COMMON_CONFIG.getCnConnectionTimeoutInMS() / 
STARTUP_RETRY_INTERVAL_IN_MS);
@@ -414,6 +414,12 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
       } else if (status.getCode() == 
TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) {
         LOGGER.warn(
             
ConfigNodeMessages.THE_RESULT_OF_REGISTER_SELF_CONFIGNODE_IS_RETRY, status, 
retry);
+      } else if (status.getCode() == 
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) {
+        LOGGER.info(
+            "ConfigNode leader is warming up before serving the registering 
ConfigNode, will wait"
+                + " and retry. Status: {}, retry: {}",
+            status,
+            retry);
       } else {
         throw new StartupException(status.getMessage());
       }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
index fd62c31ae36..e2c51df9693 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -292,4 +293,73 @@ public class LoadManagerTest {
         new Pair<>(new ConsensusGroupStatistics(newLeaderId), null),
         differentConsensusGroupStatisticsMap.get(regionGroupId));
   }
+
+  @Test
+  public void testLoadWarmUpRequiresOnlyConfigNodeAndDataNodeSamples() {
+    LoadCache loadCache = new LoadCache();
+    TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+    Set<Integer> dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet());
+
+    loadCache.createNodeHeartbeatCache(NodeType.ConfigNode, 10);
+    dataNodeIds.forEach(
+        dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, 
dataNodeId));
+    loadCache.createNodeHeartbeatCache(NodeType.AINode, 13);
+    loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, 
dataNodeIds);
+
+    Assert.assertEquals(
+        Collections.singletonList("nodes=[10, 11, 12]"),
+        loadCache.getNodeHeartbeatUnreadyReasons());
+
+    loadCache.cacheConfigNodeHeartbeatSample(10, new 
NodeHeartbeatSample(NodeStatus.Unknown));
+
+    dataNodeIds.forEach(
+        dataNodeId ->
+            loadCache.cacheDataNodeHeartbeatSample(
+                dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)));
+    loadCache.updateNodeStatistics(false);
+
+    Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty());
+  }
+
+  @Test
+  public void testRegionAndConsensusGroupsDoNotBlockLoadWarmUp() {
+    LoadCache loadCache = new LoadCache();
+    TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101);
+    Set<Integer> dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet());
+
+    dataNodeIds.forEach(
+        dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, 
dataNodeId));
+    loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, 
dataNodeIds);
+    dataNodeIds.forEach(
+        dataNodeId -> {
+          loadCache.cacheDataNodeHeartbeatSample(
+              dataNodeId, new NodeHeartbeatSample(NodeStatus.Running));
+        });
+    loadCache.updateNodeStatistics(false);
+    loadCache.updateRegionGroupStatistics();
+    loadCache.updateConsensusGroupStatistics();
+
+    Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty());
+  }
+
+  @Test
+  public void testLoadWarmUpToleratesMissingFirstHeartbeatAfterThirtySeconds() 
{
+    LOAD_CACHE.clearHeartbeatCache();
+    LOAD_CACHE.createNodeHeartbeatCache(NodeType.DataNode, 31);
+
+    try {
+      LOAD_MANAGER.markLoadServicesStartedForTest(System.currentTimeMillis());
+
+      Assert.assertFalse(LOAD_MANAGER.isLoadReady());
+      Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("waiting 
for first heartbeat"));
+
+      LOAD_MANAGER.markLoadServicesStartedForTest(
+          System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(31));
+
+      Assert.assertTrue(LOAD_MANAGER.isLoadReady());
+      Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("Missing 
heartbeats"));
+    } finally {
+      LOAD_MANAGER.stopLoadServices();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index b21d3a3a792..5c7fa59f78d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -224,6 +224,7 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
       "Failed to connect to ConfigNode %s from DataNode %s when executing %s, 
Exception:";
   private static final long RETRY_INTERVAL_MS = 1000L;
   private static final long WAIT_CN_LEADER_ELECTION_INTERVAL_MS = 2000L;
+  private static final long REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS = 
60_000L;
 
   private static final String UNSUPPORTED_INVOCATION =
       "This method is not supported for invocation by DataNode";
@@ -403,12 +404,33 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         }
         return true;
       }
+      if (status.getCode() == 
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) {
+        if (!isFirstInitiated) {
+          logger.info(
+              "ConfigNode leader {} is warming up before serving DataNode {}, 
will wait and retry."
+                  + " Reason: {}",
+              configNode,
+              config.getAddressAndPort(),
+              status.getMessage());
+        }
+        try {
+          Thread.sleep(WAIT_CN_LEADER_ELECTION_INTERVAL_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          
logger.warn(DataNodeMiscMessages.UNEXPECTED_INTERRUPTION_CONNECT_CONFIG_NODE_BREAK);
+        }
+        return true;
+      }
       return false;
     } finally {
       isFirstInitiated = false;
     }
   }
 
+  private boolean isConfigNodeLeaderWarmingUp(TSStatus status) {
+    return status.getCode() == 
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode();
+  }
+
   /**
    * The frame of execute RPC, include logic of retry and exception handling.
    *
@@ -480,20 +502,33 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
 
   @Override
   public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) 
throws TException {
-    for (int i = 0; i < RETRY_NUM; i++) {
+    boolean leaderWarmingUpObserved = false;
+    long leaderWarmingUpRetryDeadline = 0;
+    for (int i = 0;
+        i < RETRY_NUM
+            || (leaderWarmingUpObserved
+                && System.currentTimeMillis() < leaderWarmingUpRetryDeadline);
+        i++) {
       try {
         TDataNodeRegisterResp resp = client.registerDataNode(req);
 
         if (!updateConfigNodeLeader(resp.status)) {
           return resp;
         }
+        if (isConfigNodeLeaderWarmingUp(resp.status) && 
!leaderWarmingUpObserved) {
+          leaderWarmingUpObserved = true;
+          leaderWarmingUpRetryDeadline =
+              System.currentTimeMillis() + 
REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS;
+        }
 
         // set latest config node list
-        List<TEndPoint> newConfigNodes = new ArrayList<>();
-        for (TConfigNodeLocation configNodeLocation : 
resp.getConfigNodeList()) {
-          newConfigNodes.add(configNodeLocation.getInternalEndPoint());
+        if (resp.isSetConfigNodeList()) {
+          List<TEndPoint> newConfigNodes = new ArrayList<>();
+          for (TConfigNodeLocation configNodeLocation : 
resp.getConfigNodeList()) {
+            newConfigNodes.add(configNodeLocation.getInternalEndPoint());
+          }
+          configNodes = newConfigNodes;
         }
-        configNodes = newConfigNodes;
       } catch (TException e) {
         String message =
             String.format(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 978fd3f1f0e..e9e01efd2d4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -98,6 +98,7 @@ public enum ThreadName {
   CONFIG_NODE_REGION_MAINTAINER("IoTDB-Region-Maintainer"),
   // -------------------------- ConfigNode-Recover --------------------------
   CONFIG_NODE_RECOVER("ConfigNode-Manager-Recovery"),
+  
CONFIG_NODE_LEADER_SERVICES_TRANSITION("ConfigNode-Leader-Services-Transition"),
   // -------------------------- ConfigNode-Procedure ------------------------
   // TODO: Use Thread Pool to manage the procedure thread @Potato
   CONFIG_NODE_PROCEDURE_WORKER("ProcedureWorkerGroup"),
@@ -374,7 +375,7 @@ public enum ThreadName {
       new HashSet<>(Arrays.asList(CONFIG_NODE_REGION_MAINTAINER));
 
   private static final Set<ThreadName> configNodeRecoverThreadNames =
-      new HashSet<>(Arrays.asList(CONFIG_NODE_RECOVER));
+      new HashSet<>(Arrays.asList(CONFIG_NODE_RECOVER, 
CONFIG_NODE_LEADER_SERVICES_TRANSITION));
 
   private static final Set<ThreadName> configNodeProcedureThreadNames =
       new HashSet<>(

Reply via email to