This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ddd8faaff23 Improve ConfigNode leader warm-up before serving (#17821)
ddd8faaff23 is described below
commit ddd8faaff23956ac1f51c2e34c722d0db9f47f08
Author: Yongzao <[email protected]>
AuthorDate: Wed Jun 10 14:31:12 2026 +0800
Improve ConfigNode leader warm-up before serving (#17821)
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
iotdb-core/ainode/iotdb/ainode/core/constant.py | 1 +
iotdb-core/ainode/iotdb/ainode/core/rpc/client.py | 11 +-
.../handlers/heartbeat/AINodeHeartbeatHandler.java | 2 +-
.../heartbeat/DataNodeHeartbeatHandler.java | 125 +++++---
.../statemachine/ConfigRegionStateMachine.java | 332 +++++++++++++++------
.../iotdb/confignode/manager/ConfigManager.java | 4 +
.../iotdb/confignode/manager/ProcedureManager.java | 13 +-
.../manager/consensus/ConsensusManager.java | 79 +++--
.../iotdb/confignode/manager/load/LoadManager.java | 73 +++++
.../manager/load/cache/AbstractLoadCache.java | 4 +
.../confignode/manager/load/cache/LoadCache.java | 27 ++
.../load/cache/consensus/ConsensusGroupCache.java | 2 +-
.../confignode/procedure/ProcedureExecutor.java | 13 +-
.../iotdb/confignode/service/ConfigNode.java | 8 +-
.../confignode/manager/load/LoadManagerTest.java | 70 +++++
.../iotdb/db/protocol/client/ConfigNodeClient.java | 45 ++-
.../iotdb/commons/concurrent/ThreadName.java | 3 +-
18 files changed, 648 insertions(+), 165 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d22e8f55207..a8e2ef8d12e 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -209,6 +209,7 @@ public enum TSStatusCode {
CAN_NOT_CONNECT_AINODE(1011),
NO_AVAILABLE_REPLICA(1012),
NO_AVAILABLE_AINODE(1013),
+ CONFIG_NODE_LEADER_WARMING_UP(1014),
// Sync, Load TsFile
LOAD_FILE_ERROR(1100),
diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py
b/iotdb-core/ainode/iotdb/ainode/core/constant.py
index 4a2ee543d1f..1eb79085bcf 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/constant.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py
@@ -80,6 +80,7 @@ DEFAULT_CHUNK_SIZE = 8192
class TSStatusCode(Enum):
SUCCESS_STATUS = 200
REDIRECTION_RECOMMEND = 400
+ CONFIG_NODE_LEADER_WARMING_UP = 1014
MODEL_EXISTED_ERROR = 1503
MODEL_NOT_EXIST_ERROR = 1504
CREATE_MODEL_ERROR = 1505
diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
index ea6362ef080..81bb81d50bb 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
@@ -66,6 +66,7 @@ class ConfigNodeClient(object):
"Fail to connect to any config node. Please check status of
ConfigNodes"
)
self._RETRY_NUM = 10
+ self._STARTUP_RETRY_NUM = 60
self._RETRY_INTERVAL_IN_S = 1
self._try_to_connect()
@@ -163,6 +164,12 @@ class ConfigNodeClient(object):
else:
self._config_leader = None
return True
+ if status.code ==
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.get_status_code():
+ logger.info(
+ "ConfigNode leader is warming up before serving AINode, will
wait and retry. Reason: {}",
+ status.message,
+ )
+ return True
return False
def node_register(
@@ -177,7 +184,7 @@ class ConfigNodeClient(object):
versionInfo=version_info,
)
- for _ in range(0, self._RETRY_NUM):
+ for _ in range(0, self._STARTUP_RETRY_NUM):
try:
resp = self._client.registerAINode(req)
if not self._update_config_node_leader(resp.status):
@@ -208,7 +215,7 @@ class ConfigNodeClient(object):
versionInfo=version_info,
)
- for _ in range(0, self._RETRY_NUM):
+ for _ in range(0, self._STARTUP_RETRY_NUM):
try:
resp = self._client.restartAINode(req)
if not self._update_config_node_leader(resp.status):
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
index 9d8e0b6e847..03e6c0bfe31 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java
@@ -50,7 +50,7 @@ public class AINodeHeartbeatHandler implements
AsyncMethodCallback<TAIHeartbeatR
public void onError(Exception e) {
if (ThriftClient.isConnectionBroken(e)) {
loadManager.forceUpdateNodeCache(
- NodeType.DataNode, nodeId, new
NodeHeartbeatSample(NodeStatus.Unknown));
+ NodeType.AINode, nodeId, new
NodeHeartbeatSample(NodeStatus.Unknown));
}
loadManager.getLoadCache().resetHeartbeatProcessing(nodeId);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index e7a31b1dc73..52053cdab45 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -36,6 +37,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.thrift.async.AsyncMethodCallback;
+import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
@@ -82,54 +84,86 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
@Override
public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
- // Update NodeCache
+ cacheNodeHeartbeatSample(heartbeatResp);
+ cacheRegionGroupHeartbeatSamples(heartbeatResp);
+ cacheUsageSamples(heartbeatResp);
+ cachePipeHeartbeat(heartbeatResp);
+ cacheConfirmedConfigNodeEndPoints(heartbeatResp);
+ cacheRegionSizeSamples(heartbeatResp);
+ }
+
+ private void cacheNodeHeartbeatSample(TDataNodeHeartbeatResp heartbeatResp) {
loadManager
.getLoadCache()
.cacheDataNodeHeartbeatSample(nodeId, new
NodeHeartbeatSample(heartbeatResp));
+ }
+ private void cacheRegionGroupHeartbeatSamples(TDataNodeHeartbeatResp
heartbeatResp) {
RegionStatus regionStatus =
RegionStatus.valueOf(heartbeatResp.getStatus());
- heartbeatResp
- .getJudgedLeaders()
- .forEach(
- (regionGroupId, isLeader) -> {
-
- // Do not allow regions to inherit the Removing state from
datanode
- RegionStatus nextRegionStatus = regionStatus;
- if (nextRegionStatus == RegionStatus.Removing) {
- nextRegionStatus =
- loadManager
- .getLoadCache()
- .getRegionCacheLastSampleStatus(regionGroupId, nodeId);
- }
-
- // Update RegionGroupCache
- loadManager
- .getLoadCache()
- .cacheRegionHeartbeatSample(
- regionGroupId,
- nodeId,
- new RegionHeartbeatSample(
- heartbeatResp.getHeartbeatTimestamp(),
- // Region will inherit DataNode's status
- nextRegionStatus),
- false);
-
- if
(((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
- && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
- ||
(TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
- && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE))
- && Boolean.TRUE.equals(isLeader)) {
- // Update ConsensusGroupCache when necessary
- loadManager
- .getLoadCache()
- .cacheConsensusSample(
- regionGroupId,
- new ConsensusGroupHeartbeatSample(
-
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId));
- }
- });
+ Map<TConsensusGroupId, Boolean> judgedLeaders =
+ heartbeatResp.isSetJudgedLeaders()
+ ? heartbeatResp.getJudgedLeaders()
+ : Collections.emptyMap();
+ judgedLeaders.forEach(
+ (regionGroupId, isLeader) -> {
+ cacheRegionHeartbeatSample(heartbeatResp, regionStatus,
regionGroupId);
+ cacheConsensusSampleIfNeeded(heartbeatResp, regionGroupId, isLeader);
+ });
+ }
+
+ private void cacheRegionHeartbeatSample(
+ TDataNodeHeartbeatResp heartbeatResp,
+ RegionStatus dataNodeRegionStatus,
+ TConsensusGroupId regionGroupId) {
+ loadManager
+ .getLoadCache()
+ .cacheRegionHeartbeatSample(
+ regionGroupId,
+ nodeId,
+ new RegionHeartbeatSample(
+ heartbeatResp.getHeartbeatTimestamp(),
+ getRegionHeartbeatStatus(regionGroupId, dataNodeRegionStatus)),
+ false);
+ }
+
+ private RegionStatus getRegionHeartbeatStatus(
+ TConsensusGroupId regionGroupId, RegionStatus dataNodeRegionStatus) {
+ return dataNodeRegionStatus == RegionStatus.Removing
+ ?
loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId)
+ : dataNodeRegionStatus;
+ }
+
+ private void cacheConsensusSampleIfNeeded(
+ TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId,
Boolean isLeader) {
+ if (!Boolean.TRUE.equals(isLeader)
+ || !shouldCacheConsensusSample(regionGroupId)
+ || !hasConsensusLogicalTimestamp(heartbeatResp, regionGroupId)) {
+ return;
+ }
+
+ loadManager
+ .getLoadCache()
+ .cacheConsensusSample(
+ regionGroupId,
+ new ConsensusGroupHeartbeatSample(
+ heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId),
nodeId));
+ }
+
+ private boolean shouldCacheConsensusSample(TConsensusGroupId regionGroupId) {
+ return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
+ && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
+ || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
+ && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE);
+ }
+
+ private boolean hasConsensusLogicalTimestamp(
+ TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId) {
+ return heartbeatResp.isSetConsensusLogicalTimeMap()
+ &&
heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId);
+ }
+ private void cacheUsageSamples(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.getRegionDeviceUsageMap() != null) {
deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap());
deviceUsageRespProcess.accept(heartbeatResp.getRegionDeviceUsageMap());
@@ -141,6 +175,9 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
if (heartbeatResp.getRegionDisk() != null) {
regionDisk.putAll(heartbeatResp.getRegionDisk());
}
+ }
+
+ private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(
nodeId,
@@ -149,12 +186,18 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
heartbeatResp.getPipeRemainingEventCountList(),
heartbeatResp.getPipeRemainingTimeList());
}
+ }
+
+ private void cacheConfirmedConfigNodeEndPoints(TDataNodeHeartbeatResp
heartbeatResp) {
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
.getLoadCache()
.updateConfirmedConfigNodeEndPoints(
nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints());
}
+ }
+
+ private void cacheRegionSizeSamples(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.isSetRegionDisk()) {
loadManager.getLoadCache().updateRegionSizeMap(nodeId,
heartbeatResp.getRegionDisk());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index fe687d17556..251db87dbdc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -60,22 +60,53 @@ import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.Optional;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/** {@link IStateMachine} for ConfigRegion. */
public class ConfigRegionStateMachine implements IStateMachine,
IStateMachine.EventApi {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigRegionStateMachine.class);
- private static final ExecutorService threadPool =
+ /**
+ * Serializes leadership transitions (become-leader / resign-leader). A
single worker thread is
+ * the barrier that keeps epochs strictly serial: the orchestration of one
transition runs to
+ * completion before the next one begins.
+ */
+ private static final ExecutorService leaderServicesTransitionExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
+ ThreadName.CONFIG_NODE_LEADER_SERVICES_TRANSITION.getName());
+
+ /** Runs the individual leader services in parallel within a single
become-leader epoch. */
+ private static final ExecutorService leaderServicesStartupPool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.CONFIG_NODE_RECOVER.getName());
+
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+ private static final long WAIT_LOAD_READY_TIMEOUT_MS =
+ CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS()
/ 2;
+ private static final long WAIT_LOAD_READY_INTERVAL_MS = 100;
private final ConfigPlanExecutor executor;
+
+ /**
+ * Whether the leader services of the {@link #leaderServicesEpoch current
epoch} have finished
+ * starting up. Read by {@link ConsensusManager#confirmLeader()} to gate
external serving.
+ */
+ private final AtomicBoolean leaderServicesReady;
+
+ /**
+ * Monotonically increasing leadership generation. Every become-leader /
resign-leader transition
+ * bumps it, so any work submitted for an older epoch can detect it is stale
and bail out.
+ */
+ private final AtomicLong leaderServicesEpoch;
+
+ /** Guards {@link #leaderServicesReady} and {@link #leaderServicesEpoch} as
a unit. */
+ private final Object leaderServicesLock;
+
private ConfigManager configManager;
/** Variables for {@link ConfigNode} Simple Consensus. */
@@ -87,17 +118,20 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
private static final String CURRENT_FILE_DIR =
ConsensusManager.getConfigRegionDir() + File.separator + "current";
+ private static final String LOG_INPROGRESS_FILE_PREFIX = "log_inprogress_";
+ private static final String LOG_FILE_PREFIX = "log_";
private static final String PROGRESS_FILE_PATH =
- CURRENT_FILE_DIR + File.separator + "log_inprogress_";
- private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator +
"log_";
+ CURRENT_FILE_DIR + File.separator + LOG_INPROGRESS_FILE_PREFIX;
+ private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator +
LOG_FILE_PREFIX;
private static final long LOG_FILE_MAX_SIZE =
CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
private final TEndPoint currentNodeTEndPoint;
- private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
- private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");
public ConfigRegionStateMachine(ConfigManager configManager,
ConfigPlanExecutor executor) {
this.executor = executor;
+ this.leaderServicesReady = new AtomicBoolean(false);
+ this.leaderServicesEpoch = new AtomicLong(0);
+ this.leaderServicesLock = new Object();
this.configManager = configManager;
this.currentNodeTEndPoint =
new TEndPoint()
@@ -115,9 +149,9 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
@Override
public TSStatus write(IConsensusRequest request) {
- return Optional.ofNullable(request)
- .map(o -> write((ConfigPhysicalPlan) request))
- .orElseGet(() -> new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ return request == null
+ ? new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ : write((ConfigPhysicalPlan) request);
}
/** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
@@ -233,7 +267,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
// We get currentNodeId here because the currentNodeId
// couldn't initialize earlier than the ConfigRegionStateMachine
- int currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+ final int currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
if (currentNodeId != newLeaderId) {
LOGGER.info(
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
@@ -241,6 +275,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
currentNodeId,
currentNodeTEndPoint,
newLeaderId);
+ resignLeaderAsync();
}
}
@@ -248,12 +283,117 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
public void notifyNotLeader() {
// We get currentNodeId here because the currentNodeId
// couldn't initialize earlier than the ConfigRegionStateMachine
- int currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+ final int currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
LOGGER.info(
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
+ "start cleaning up related services",
currentNodeId,
currentNodeTEndPoint);
+ resignLeaderAsync();
+ }
+
+ @Override
+ public void notifyLeaderReady() {
+ LOGGER.info(
+
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER,
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+ currentNodeTEndPoint);
+ // Bump the epoch eagerly so that any in-flight services of an older epoch
are invalidated
+ // immediately, even before the (serialized) become-leader orchestration
gets to run.
+ final long epoch = nextLeaderServicesEpoch();
+ leaderServicesTransitionExecutor.submit(() -> becomeLeader(epoch));
+ }
+
+ /**
+ * Submit a resign-leader transition. The epoch is bumped eagerly (on the
consensus thread) so
+ * that stale leader work is invalidated at once, while the teardown itself
is serialized behind
+ * any in-flight transition on {@link #leaderServicesTransitionExecutor}.
+ */
+ private void resignLeaderAsync() {
+ invalidateLeaderServices();
+ leaderServicesTransitionExecutor.submit(this::stopLeaderServices);
+ }
+
+ /**
+ * Bring up the leader services for {@code epoch}. Runs on the single
transition thread, so it is
+ * strictly serialized against every other transition. Within the epoch, the
load services start
+ * first (to warm up as early as possible), then the remaining services
start in parallel and are
+ * joined before the epoch is marked ready.
+ */
+ private void becomeLeader(final long epoch) {
+ if (!isCurrentLeaderServicesEpoch(epoch)) {
+ LOGGER.info(
+
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
+ + "skip starting leader services because the leader epoch is
stale",
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+ currentNodeTEndPoint);
+ return;
+ }
+
+ // Always start load services first. ConsensusManager gates external
serving until warm-up.
+ configManager.getLoadManager().startLoadServices();
+ if (CONF.isEnableTopologyProbing()) {
+ configManager.getLoadManager().startTopologyService();
+ }
+
+ // Start the remaining leader services in parallel and wait for all of
them to finish.
+ final CompletableFuture<?>[] startups =
+ leaderServiceStartups().stream()
+ .map(startup -> startInParallelIfEpochCurrent(epoch, startup))
+ .toArray(CompletableFuture[]::new);
+ CompletableFuture.allOf(startups).join();
+
+ if (!isCurrentLeaderServicesEpoch(epoch)) {
+ return;
+ }
+ // The procedure executor may report readiness asynchronously once it has
caught up.
+ configManager
+ .getProcedureManager()
+ .startExecutor(() -> markLeaderServicesReadyIfEpochCurrent(epoch));
+ markLeaderServicesReadyIfEpochCurrent(epoch);
+
+ final boolean loadReady = waitForLoadReady(epoch);
+ if (!isCurrentLeaderServicesEpoch(epoch)) {
+ return;
+ }
+ logLoadWarmUpIfNeeded(loadReady);
+ LOGGER.info(
+
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS,
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+ currentNodeTEndPoint);
+ }
+
+ /** The leader services that can be started independently, in parallel,
within one epoch. */
+ private List<Runnable> leaderServiceStartups() {
+ return Arrays.asList(
+ () ->
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(),
+ () ->
configManager.getRetryFailedTasksThread().startRetryFailedTasksService(),
+ () -> configManager.getPartitionManager().startRegionCleaner(),
+ // Add metrics after leader ready.
+ () -> configManager.addMetrics(),
+ // Activate leader related service for config pipe.
+ () -> PipeConfigNodeAgent.runtime().notifyLeaderReady(),
+ // CQ recovery may be time-consuming, so it is just one more parallel
startup.
+ () -> configManager.getCQManager().startCQScheduler(),
+ () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(),
+ () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(),
+ () ->
+ configManager
+ .getPipeManager()
+ .getPipeRuntimeCoordinator()
+ .onConfigRegionGroupLeaderChanged(),
+ () ->
+ configManager
+ .getSubscriptionManager()
+ .getSubscriptionCoordinator()
+ .startSubscriptionMetaSync(),
+ // To adapt old version, we check cluster ID after state machine has
been fully recovered.
+ () -> configManager.getClusterManager().checkClusterId());
+ }
+
+ /** Tear down every leader service. Runs on the single transition thread. */
+ private void stopLeaderServices() {
+ final int currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
// Stop leader scheduling services
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
@@ -281,63 +421,89 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
currentNodeTEndPoint);
}
- @Override
- public void notifyLeaderReady() {
- LOGGER.info(
-
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER,
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- currentNodeTEndPoint);
+ /**
+ * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it
(and recording success)
+ * if the epoch has gone stale by the time it is picked up. Returns a future
that always completes
+ * normally so {@link CompletableFuture#allOf} acts as a clean join barrier.
+ */
+ private CompletableFuture<Void> startInParallelIfEpochCurrent(
+ final long epoch, final Runnable startup) {
+ return CompletableFuture.runAsync(
+ () -> {
+ if (isCurrentLeaderServicesEpoch(epoch)) {
+ startup.run();
+ }
+ },
+ leaderServicesStartupPool);
+ }
- // Always start load services first
- configManager.getLoadManager().startLoadServices();
+ private void markLeaderServicesReadyIfEpochCurrent(final long epoch) {
+ synchronized (leaderServicesLock) {
+ if (isCurrentLeaderServicesEpoch(epoch)) {
+ leaderServicesReady.set(true);
+ }
+ }
+ }
- if (CONF.isEnableTopologyProbing()) {
- configManager.getLoadManager().startTopologyService();
+ private void logLoadWarmUpIfNeeded(final boolean loadReady) {
+ if (!loadReady) {
+ LOGGER.info(
+ "Current ConfigNode(nodeId: {}, ip: {}) finished starting leader
services while load"
+ + " warm-up is still in progress: {}",
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+ currentNodeTEndPoint,
+ configManager.getLoadManager().getLoadReadyReason());
}
+ }
- // Start leader scheduling services
- configManager.getProcedureManager().startExecutor();
- threadPool.submit(
- () ->
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade());
- configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
- configManager.getPartitionManager().startRegionCleaner();
- // Add Metric after leader ready
- configManager.addMetrics();
-
- // Activate leader related service for config pipe
- PipeConfigNodeAgent.runtime().notifyLeaderReady();
-
- // we do cq recovery async for performance:
- // cq recovery may be time-consuming, we use another thread to do it in
- // make notifyLeaderChanged not blocked by it
- threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
-
- threadPool.submit(
- () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
- threadPool.submit(
- () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
- threadPool.submit(
- () ->
- configManager
- .getPipeManager()
- .getPipeRuntimeCoordinator()
- .onConfigRegionGroupLeaderChanged());
+ private boolean waitForLoadReady(final long epoch) {
+ long startTime = System.currentTimeMillis();
+ while (isCurrentLeaderServicesEpoch(epoch)
+ && System.currentTimeMillis() - startTime <
WAIT_LOAD_READY_TIMEOUT_MS) {
+ if (configManager.getLoadManager().isLoadReady()) {
+ return true;
+ }
+ if (!sleepForLoadReady()) {
+ return false;
+ }
+ }
+ return isCurrentLeaderServicesEpoch(epoch) &&
configManager.getLoadManager().isLoadReady();
+ }
- threadPool.submit(
- () ->
- configManager
- .getSubscriptionManager()
- .getSubscriptionCoordinator()
- .startSubscriptionMetaSync());
+ private boolean sleepForLoadReady() {
+ try {
+ TimeUnit.MILLISECONDS.sleep(WAIT_LOAD_READY_INTERVAL_MS);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption while waiting for ConfigNode leader
load warm-up.", e);
+ return false;
+ }
+ }
- // To adapt old version, we check cluster ID after state machine has been
fully recovered.
- // Do check async because sync will be slow and block every other things.
- threadPool.submit(() ->
configManager.getClusterManager().checkClusterId());
+ public boolean areLeaderServicesReady() {
+ return leaderServicesReady.get();
+ }
- LOGGER.info(
-
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS,
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- currentNodeTEndPoint);
+ /** Open a new leadership generation, invalidating the previous one. */
+ private long nextLeaderServicesEpoch() {
+ synchronized (leaderServicesLock) {
+ leaderServicesReady.set(false);
+ return leaderServicesEpoch.incrementAndGet();
+ }
+ }
+
+ /** Invalidate the current leadership generation without opening a serving
one. */
+ private void invalidateLeaderServices() {
+ synchronized (leaderServicesLock) {
+ leaderServicesReady.set(false);
+ leaderServicesEpoch.incrementAndGet();
+ }
+ }
+
+ private boolean isCurrentLeaderServicesEpoch(final long epoch) {
+ return leaderServicesEpoch.get() == epoch
+ && configManager.getConsensusManager().isLeaderReady();
}
@Override
@@ -413,7 +579,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
dir.mkdirs();
String[] list = new File(CURRENT_FILE_DIR).list();
if (list != null && list.length != 0) {
- Arrays.sort(list, new FileComparator());
+ Arrays.sort(list,
Comparator.comparingLong(ConfigRegionStateMachine::parseEndIndex));
for (String logFileName : list) {
File logFile =
SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR +
File.separator + logFileName);
@@ -497,28 +663,28 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
}
- static class FileComparator implements Comparator<String> {
-
- @Override
- public int compare(String filename1, String filename2) {
- long id1 = parseEndIndex(filename1);
- long id2 = parseEndIndex(filename2);
- return Long.compare(id1, id2);
- }
- }
-
- static long parseEndIndex(String filename) {
- if (filename.startsWith("log_inprogress_")) {
- Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename);
- if (matcher.find()) {
- return Long.parseLong(matcher.group());
+ private static long parseEndIndex(String filename) {
+ final String endIndexString;
+ if (filename.startsWith(LOG_INPROGRESS_FILE_PREFIX)) {
+ endIndexString = filename.substring(LOG_INPROGRESS_FILE_PREFIX.length());
+ } else if (filename.startsWith(LOG_FILE_PREFIX)) {
+ final int lastSeparatorIndex = filename.lastIndexOf('_');
+ if (lastSeparatorIndex < LOG_FILE_PREFIX.length()) {
+ return 0;
}
+ endIndexString = filename.substring(lastSeparatorIndex + 1);
} else {
- Matcher matcher = LOG_PATTERN.matcher(filename);
- if (matcher.find()) {
- return Long.parseLong(matcher.group());
+ return 0;
+ }
+
+ if (endIndexString.isEmpty()) {
+ return 0;
+ }
+ for (int i = 0; i < endIndexString.length(); i++) {
+ if (!Character.isDigit(endIndexString.charAt(i))) {
+ return 0;
}
}
- return 0;
+ return Long.parseLong(endIndexString);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 64ee0c54791..c5bafc550f6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1256,6 +1256,10 @@ public class ConfigManager implements IManager {
"ConsensusManager of target-ConfigNode is not initialized, "
+ "please make sure the target-ConfigNode has been started
successfully.");
}
+ // Procedure recovery replays metadata writes before external load warm-up
is complete.
+ if (procedureManager.isProcedureExecutionThread()) {
+ return getConsensusManager().confirmLeaderForInternalProcedure();
+ }
return getConsensusManager().confirmLeader();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index a974f9e1d7c..cb76a9550af 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -228,14 +228,21 @@ public class ProcedureManager {
}
public void startExecutor() {
+ startExecutor(null);
+ }
+
+ public void startExecutor(final Runnable beforeStartingWorkers) {
if (!executor.isRunning()) {
executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount());
- executor.startWorkers();
executor.startCompletedCleaner(
CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
executor.addInternalProcedure(partitionTableCleaner);
store.start();
+ if (beforeStartingWorkers != null) {
+ beforeStartingWorkers.run();
+ }
+ executor.startWorkers();
LOGGER.info(ManagerMessages.PROCEDUREMANAGER_IS_STARTED_SUCCESSFULLY);
}
}
@@ -252,6 +259,10 @@ public class ProcedureManager {
}
}
+ public boolean isProcedureExecutionThread() {
+ return ProcedureExecutor.isProcedureExecutionThread();
+ }
+
@TestOnly
public TSStatus createManyDatabases() {
this.executor.submitProcedure(new CreateManyDatabasesProcedure());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 84594b0d7a8..d7024c357b1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -81,12 +81,14 @@ public class ConsensusManager {
new ConfigRegionId(CONF.getConfigRegionId());
private final IManager configManager;
+ private final ConfigRegionStateMachine stateMachine;
private IConsensus consensusImpl;
private boolean isInitialized;
public ConsensusManager(IManager configManager, ConfigRegionStateMachine
stateMachine) {
this.configManager = configManager;
+ this.stateMachine = stateMachine;
setConsensusLayer(stateMachine);
}
@@ -94,6 +96,7 @@ public class ConsensusManager {
ConsensusManager(IManager configManager, IConsensus consensusImpl) {
this.configManager = configManager;
this.consensusImpl = consensusImpl;
+ this.stateMachine = null;
}
public void start() throws IOException {
@@ -445,39 +448,59 @@ public class ConsensusManager {
* NEED_REDIRECTION otherwise
*/
public TSStatus confirmLeader() {
- TSStatus result = new TSStatus();
- if (isLeaderReady()) {
- result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } else {
- result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
- if (isLeader()) {
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime <
MAX_WAIT_READY_TIME_MS) {
- if (isLeaderReady()) {
- result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- return result;
- }
- try {
- Thread.sleep(RETRY_WAIT_TIME_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn(
-
ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY);
- break;
- }
- }
- result.setMessage(
- "The current ConfigNode is leader but not ready yet, please try
again later.");
- } else {
- result.setMessage(
- "The current ConfigNode is not leader, please redirect to a new
ConfigNode.");
- }
+ return confirmLeader(true);
+ }
+
+ private TSStatus confirmLeader(final boolean checkLoadReady) {
+ if (!isLeader()) {
+ TSStatus result = new
TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+ result.setMessage(
+ "The current ConfigNode is not leader, please redirect to a new
ConfigNode.");
TConfigNodeLocation leaderLocation = getLeaderLocation();
if (leaderLocation != null) {
result.setRedirectNode(leaderLocation.getInternalEndPoint());
}
+ return result;
+ }
+
+ waitForLeaderReady();
+
+ if (!isLeaderReady()) {
+ return getLeaderWarmingUpStatus(
+ "The current ConfigNode is leader but consensus is not ready yet.");
+ }
+ if (!stateMachine.areLeaderServicesReady()) {
+ return getLeaderWarmingUpStatus(
+ "The current ConfigNode is leader but leader services are not ready
yet.");
+ }
+ if (checkLoadReady && !configManager.getLoadManager().isLoadReady()) {
+ return
getLeaderWarmingUpStatus(configManager.getLoadManager().getLoadReadyReason());
+ }
+
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public TSStatus confirmLeaderForInternalProcedure() {
+ return confirmLeader(false);
+ }
+
+ private void waitForLeaderReady() {
+ long startTime = System.currentTimeMillis();
+ while (!isLeaderReady() && System.currentTimeMillis() - startTime <
MAX_WAIT_READY_TIME_MS) {
+ try {
+ Thread.sleep(RETRY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn(
+
ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY);
+ return;
+ }
}
- return result;
+ }
+
+ private TSStatus getLeaderWarmingUpStatus(String message) {
+ return new
TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode())
+ .setMessage(message);
}
public ConsensusGroupId getConsensusGroupId() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index e0547bf640d..63d5ff8befd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -50,7 +50,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
@@ -59,6 +62,8 @@ import java.util.function.Function;
*/
public class LoadManager {
+ private static final long FIRST_HEARTBEAT_READY_TOLERANCE_MS =
TimeUnit.SECONDS.toMillis(30);
+
protected final IManager configManager;
/** Balancers. */
@@ -74,6 +79,10 @@ public class LoadManager {
private final StatisticsService statisticsService;
private final EventService eventService;
private final TopologyService topologyService;
+ private final AtomicBoolean loadServicesStarted;
+ private final AtomicLong loadReadyStartTimeMillis;
+ private final AtomicBoolean loadReady;
+ private volatile String loadReadyReason;
public LoadManager(IManager configManager) {
this.configManager = configManager;
@@ -92,6 +101,10 @@ public class LoadManager {
configManager.getSubscriptionManager().getSubscriptionLeaderChangeHandler());
this.eventService.register(routeBalancer);
this.eventService.register(topologyService);
+ this.loadServicesStarted = new AtomicBoolean(false);
+ this.loadReadyStartTimeMillis = new AtomicLong(0);
+ this.loadReady = new AtomicBoolean(false);
+ this.loadReadyReason = "ConfigNode leader load services are not started.";
}
protected void setHeartbeatService(IManager configManager, LoadCache
loadCache) {
@@ -149,7 +162,11 @@ public class LoadManager {
}
public void startLoadServices() {
+ loadReady.set(false);
+ loadReadyStartTimeMillis.set(System.currentTimeMillis());
+ loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat
sampling.";
loadCache.initHeartbeatCache(configManager);
+ loadServicesStarted.set(true);
heartbeatService.startHeartbeatService();
statisticsService.startLoadStatisticsService();
eventService.startEventService();
@@ -157,6 +174,10 @@ public class LoadManager {
}
public void stopLoadServices() {
+ loadServicesStarted.set(false);
+ loadReadyStartTimeMillis.set(0);
+ loadReady.set(false);
+ loadReadyReason = "ConfigNode leader load services are stopped.";
heartbeatService.stopHeartbeatService();
statisticsService.stopLoadStatisticsService();
eventService.stopEventService();
@@ -165,6 +186,50 @@ public class LoadManager {
routeBalancer.clearRegionPriority();
}
+ public boolean isLoadReady() {
+ return loadReady.get() || tryUpdateLoadReady();
+ }
+
+ public String getLoadReadyReason() {
+ return loadReadyReason;
+ }
+
+ private synchronized boolean tryUpdateLoadReady() {
+ if (loadReady.get()) {
+ return true;
+ }
+ if (!loadServicesStarted.get()) {
+ loadReadyReason = "ConfigNode leader load services are not started.";
+ return false;
+ }
+
+ loadCache.updateNodeStatistics(false);
+ eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
+
+ List<String> unreadyReasons = loadCache.getNodeHeartbeatUnreadyReasons();
+ if (unreadyReasons.isEmpty()) {
+ loadReadyReason = "ConfigNode leader load services are ready.";
+ loadReady.set(true);
+ return true;
+ }
+
+ long elapsedMillis = System.currentTimeMillis() -
loadReadyStartTimeMillis.get();
+ if (elapsedMillis < FIRST_HEARTBEAT_READY_TOLERANCE_MS) {
+ loadReadyReason =
+ "ConfigNode leader is waiting for first heartbeat from registered
ConfigNodes/DataNodes: "
+ + unreadyReasons;
+ return false;
+ }
+
+ loadReadyReason =
+ "ConfigNode leader load services are ready after waiting "
+ + FIRST_HEARTBEAT_READY_TOLERANCE_MS
+ + "ms for first heartbeat. Missing heartbeats: "
+ + unreadyReasons;
+ loadReady.set(true);
+ return true;
+ }
+
public void startTopologyService() {
topologyService.startTopologyService();
}
@@ -489,6 +554,14 @@ public class LoadManager {
return routeBalancer;
}
+ @TestOnly
+ void markLoadServicesStartedForTest(long loadReadyStartTimeMillis) {
+ loadServicesStarted.set(true);
+ loadReady.set(false);
+ this.loadReadyStartTimeMillis.set(loadReadyStartTimeMillis);
+ loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat
sampling.";
+ }
+
@TestOnly
public EventService getEventService() {
return eventService;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index d61a0043520..e5ab6445f98 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -97,6 +97,10 @@ public abstract class AbstractLoadCache {
return slidingWindow.isEmpty() ? null :
slidingWindow.get(slidingWindow.size() - 1);
}
+ public boolean hasHeartbeatSample() {
+ return getLastSample() != null;
+ }
+
/**
* Update currentStatistics based on the latest heartbeat sample that cached
in the slidingWindow.
*/
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 818171c89bc..3416246811e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -82,6 +82,7 @@ public class LoadCache {
Math.max(
ProcedureManager.PROCEDURE_WAIT_TIME_OUT -
TimeUnit.SECONDS.toMillis(2),
TimeUnit.SECONDS.toMillis(10));
+ private static final int MAX_UNREADY_ENTITY_PRINT = 10;
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
@@ -496,6 +497,32 @@ public class LoadCache {
return consensusGroupStatisticsMap;
}
+ public List<String> getNodeHeartbeatUnreadyReasons() {
+ List<Integer> unreadyNodes = new ArrayList<>();
+ nodeCacheMap.forEach(
+ (nodeId, nodeCache) -> {
+ if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
+ return;
+ }
+ if ((nodeCache instanceof ConfigNodeHeartbeatCache
+ || nodeCache instanceof DataNodeHeartbeatCache)
+ && !nodeCache.hasHeartbeatSample()) {
+ unreadyNodes.add(nodeId);
+ }
+ });
+ if (unreadyNodes.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collections.sort(unreadyNodes);
+ List<Integer> nodesToPrint =
+ unreadyNodes.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT,
unreadyNodes.size()));
+ String suffix =
+ unreadyNodes.size() > MAX_UNREADY_ENTITY_PRINT
+ ? "...(" + (unreadyNodes.size() - MAX_UNREADY_ENTITY_PRINT) + "
more)"
+ : "";
+ return Collections.singletonList("nodes=" + nodesToPrint + suffix);
+ }
+
/**
* Safely get NodeStatus by NodeId.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
index aa924dc29b5..7dcacc4fd80 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
@@ -41,7 +41,7 @@ public class ConsensusGroupCache extends AbstractLoadCache {
synchronized (slidingWindow) {
lastSample = (ConsensusGroupHeartbeatSample) getLastSample();
}
- if (lastSample != null && lastSample.getLeaderId() != UN_READY_LEADER_ID) {
+ if (lastSample != null) {
currentStatistics.set(
new ConsensusGroupStatistics(System.nanoTime(),
lastSample.getLeaderId()));
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 11bb7f382d4..82afea3859f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -56,6 +56,8 @@ import static
org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
public class ProcedureExecutor<Env> {
private static final Logger LOG =
LoggerFactory.getLogger(ProcedureExecutor.class);
+ private static final ThreadLocal<Boolean> PROCEDURE_EXECUTION_CONTEXT =
+ ThreadLocal.withInitial(() -> false);
private final ConcurrentHashMap<Long, CompletedProcedureContainer<Env>>
completed =
new ConcurrentHashMap<>();
@@ -96,6 +98,10 @@ public class ProcedureExecutor<Env> {
this(environment, store, new SimpleProcedureScheduler());
}
+ public static boolean isProcedureExecutionThread() {
+ return PROCEDURE_EXECUTION_CONTEXT.get();
+ }
+
public void init(int numThreads) {
this.corePoolSize = numThreads;
this.maxPoolSize = 10 * numThreads;
@@ -784,7 +790,12 @@ public class ProcedureExecutor<Env> {
this.activeProcedure.set(procedure);
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
- executeProcedure(procedure);
+ PROCEDURE_EXECUTION_CONTEXT.set(true);
+ try {
+ executeProcedure(procedure);
+ } finally {
+ PROCEDURE_EXECUTION_CONTEXT.remove();
+ }
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(),
activeExecutorCount.get());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 037f138286a..da766541786 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -89,7 +89,7 @@ public class ConfigNode extends ServerCommandLine implements
ConfigNodeMBean {
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
- private static final int STARTUP_RETRY_NUM = 10;
+ private static final int STARTUP_RETRY_NUM = 20;
private static final long STARTUP_RETRY_INTERVAL_IN_MS =
TimeUnit.SECONDS.toMillis(3);
private static final int SCHEDULE_WAITING_RETRY_NUM =
(int) (COMMON_CONFIG.getCnConnectionTimeoutInMS() /
STARTUP_RETRY_INTERVAL_IN_MS);
@@ -414,6 +414,12 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
} else if (status.getCode() ==
TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) {
LOGGER.warn(
ConfigNodeMessages.THE_RESULT_OF_REGISTER_SELF_CONFIGNODE_IS_RETRY, status,
retry);
+ } else if (status.getCode() ==
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) {
+ LOGGER.info(
+ "ConfigNode leader is warming up before serving the registering
ConfigNode, will wait"
+ + " and retry. Status: {}, retry: {}",
+ status,
+ retry);
} else {
throw new StartupException(status.getMessage());
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
index fd62c31ae36..e2c51df9693 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -292,4 +293,73 @@ public class LoadManagerTest {
new Pair<>(new ConsensusGroupStatistics(newLeaderId), null),
differentConsensusGroupStatisticsMap.get(regionGroupId));
}
+
+ @Test
+ public void testLoadWarmUpRequiresOnlyConfigNodeAndDataNodeSamples() {
+ LoadCache loadCache = new LoadCache();
+ TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+ Set<Integer> dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet());
+
+ loadCache.createNodeHeartbeatCache(NodeType.ConfigNode, 10);
+ dataNodeIds.forEach(
+ dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode,
dataNodeId));
+ loadCache.createNodeHeartbeatCache(NodeType.AINode, 13);
+ loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId,
dataNodeIds);
+
+ Assert.assertEquals(
+ Collections.singletonList("nodes=[10, 11, 12]"),
+ loadCache.getNodeHeartbeatUnreadyReasons());
+
+ loadCache.cacheConfigNodeHeartbeatSample(10, new
NodeHeartbeatSample(NodeStatus.Unknown));
+
+ dataNodeIds.forEach(
+ dataNodeId ->
+ loadCache.cacheDataNodeHeartbeatSample(
+ dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)));
+ loadCache.updateNodeStatistics(false);
+
+ Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty());
+ }
+
+ @Test
+ public void testRegionAndConsensusGroupsDoNotBlockLoadWarmUp() {
+ LoadCache loadCache = new LoadCache();
+ TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101);
+ Set<Integer> dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet());
+
+ dataNodeIds.forEach(
+ dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode,
dataNodeId));
+ loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId,
dataNodeIds);
+ dataNodeIds.forEach(
+ dataNodeId -> {
+ loadCache.cacheDataNodeHeartbeatSample(
+ dataNodeId, new NodeHeartbeatSample(NodeStatus.Running));
+ });
+ loadCache.updateNodeStatistics(false);
+ loadCache.updateRegionGroupStatistics();
+ loadCache.updateConsensusGroupStatistics();
+
+ Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty());
+ }
+
+ @Test
+ public void testLoadWarmUpToleratesMissingFirstHeartbeatAfterThirtySeconds()
{
+ LOAD_CACHE.clearHeartbeatCache();
+ LOAD_CACHE.createNodeHeartbeatCache(NodeType.DataNode, 31);
+
+ try {
+ LOAD_MANAGER.markLoadServicesStartedForTest(System.currentTimeMillis());
+
+ Assert.assertFalse(LOAD_MANAGER.isLoadReady());
+ Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("waiting
for first heartbeat"));
+
+ LOAD_MANAGER.markLoadServicesStartedForTest(
+ System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(31));
+
+ Assert.assertTrue(LOAD_MANAGER.isLoadReady());
+ Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("Missing
heartbeats"));
+ } finally {
+ LOAD_MANAGER.stopLoadServices();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index b21d3a3a792..5c7fa59f78d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -224,6 +224,7 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
"Failed to connect to ConfigNode %s from DataNode %s when executing %s,
Exception:";
private static final long RETRY_INTERVAL_MS = 1000L;
private static final long WAIT_CN_LEADER_ELECTION_INTERVAL_MS = 2000L;
+ private static final long REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS =
60_000L;
private static final String UNSUPPORTED_INVOCATION =
"This method is not supported for invocation by DataNode";
@@ -403,12 +404,33 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
}
return true;
}
+ if (status.getCode() ==
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) {
+ if (!isFirstInitiated) {
+ logger.info(
+ "ConfigNode leader {} is warming up before serving DataNode {},
will wait and retry."
+ + " Reason: {}",
+ configNode,
+ config.getAddressAndPort(),
+ status.getMessage());
+ }
+ try {
+ Thread.sleep(WAIT_CN_LEADER_ELECTION_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
logger.warn(DataNodeMiscMessages.UNEXPECTED_INTERRUPTION_CONNECT_CONFIG_NODE_BREAK);
+ }
+ return true;
+ }
return false;
} finally {
isFirstInitiated = false;
}
}
+ private boolean isConfigNodeLeaderWarmingUp(TSStatus status) {
+ return status.getCode() ==
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode();
+ }
+
/**
* The frame of execute RPC, include logic of retry and exception handling.
*
@@ -480,20 +502,33 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
@Override
public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
+ boolean leaderWarmingUpObserved = false;
+ long leaderWarmingUpRetryDeadline = 0;
+ for (int i = 0;
+ i < RETRY_NUM
+ || (leaderWarmingUpObserved
+ && System.currentTimeMillis() < leaderWarmingUpRetryDeadline);
+ i++) {
try {
TDataNodeRegisterResp resp = client.registerDataNode(req);
if (!updateConfigNodeLeader(resp.status)) {
return resp;
}
+ if (isConfigNodeLeaderWarmingUp(resp.status) &&
!leaderWarmingUpObserved) {
+ leaderWarmingUpObserved = true;
+ leaderWarmingUpRetryDeadline =
+ System.currentTimeMillis() +
REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS;
+ }
// set latest config node list
- List<TEndPoint> newConfigNodes = new ArrayList<>();
- for (TConfigNodeLocation configNodeLocation :
resp.getConfigNodeList()) {
- newConfigNodes.add(configNodeLocation.getInternalEndPoint());
+ if (resp.isSetConfigNodeList()) {
+ List<TEndPoint> newConfigNodes = new ArrayList<>();
+ for (TConfigNodeLocation configNodeLocation :
resp.getConfigNodeList()) {
+ newConfigNodes.add(configNodeLocation.getInternalEndPoint());
+ }
+ configNodes = newConfigNodes;
}
- configNodes = newConfigNodes;
} catch (TException e) {
String message =
String.format(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 978fd3f1f0e..e9e01efd2d4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -98,6 +98,7 @@ public enum ThreadName {
CONFIG_NODE_REGION_MAINTAINER("IoTDB-Region-Maintainer"),
// -------------------------- ConfigNode-Recover --------------------------
CONFIG_NODE_RECOVER("ConfigNode-Manager-Recovery"),
+
CONFIG_NODE_LEADER_SERVICES_TRANSITION("ConfigNode-Leader-Services-Transition"),
// -------------------------- ConfigNode-Procedure ------------------------
// TODO: Use Thread Pool to manage the procedure thread @Potato
CONFIG_NODE_PROCEDURE_WORKER("ProcedureWorkerGroup"),
@@ -374,7 +375,7 @@ public enum ThreadName {
new HashSet<>(Arrays.asList(CONFIG_NODE_REGION_MAINTAINER));
private static final Set<ThreadName> configNodeRecoverThreadNames =
- new HashSet<>(Arrays.asList(CONFIG_NODE_RECOVER));
+ new HashSet<>(Arrays.asList(CONFIG_NODE_RECOVER,
CONFIG_NODE_LEADER_SERVICES_TRANSITION));
private static final Set<ThreadName> configNodeProcedureThreadNames =
new HashSet<>(