This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch improve-confignode-leader-confirm in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4e6d45f4e689a38274f26654b37dc4ba5d09f160 Author: Yongzao <[email protected]> AuthorDate: Tue Jun 2 19:54:41 2026 +0800 Improve ConfigNode leader warm-up gating --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../heartbeat/DataNodeHeartbeatHandler.java | 93 +++++++------ .../statemachine/ConfigRegionStateMachine.java | 25 +++- .../iotdb/confignode/manager/ConfigManager.java | 8 +- .../iotdb/confignode/manager/load/LoadManager.java | 110 +++++++++++++++- .../manager/load/cache/AbstractLoadCache.java | 4 + .../confignode/manager/load/cache/LoadCache.java | 146 +++++++++++++++++++++ .../load/cache/consensus/ConsensusGroupCache.java | 2 +- .../manager/partition/PartitionManager.java | 16 ++- .../confignode/manager/load/LoadManagerTest.java | 62 +++++++++ .../iotdb/db/protocol/client/ConfigNodeClient.java | 17 +++ 11 files changed, 435 insertions(+), 49 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 035c648132f..dad3ef44e23 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -209,6 +209,7 @@ public enum TSStatusCode { CAN_NOT_CONNECT_AINODE(1011), NO_AVAILABLE_REPLICA(1012), NO_AVAILABLE_AINODE(1013), + CONFIG_NODE_LEADER_WARMING_UP(1014), // Sync, Load TsFile LOAD_FILE_ERROR(1100), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index e7a31b1dc73..18b8206cbb2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; @@ -27,7 +28,6 @@ import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator; @@ -36,6 +36,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.thrift.async.AsyncMethodCallback; +import java.util.Collections; import java.util.Map; import java.util.function.Consumer; @@ -89,46 +90,55 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<TDataNodeHe RegionStatus regionStatus = RegionStatus.valueOf(heartbeatResp.getStatus()); - heartbeatResp - .getJudgedLeaders() - .forEach( - (regionGroupId, isLeader) -> { - - // Do not allow regions to inherit the Removing state from datanode - RegionStatus nextRegionStatus = regionStatus; - if (nextRegionStatus == RegionStatus.Removing) { - nextRegionStatus = - loadManager - .getLoadCache() - .getRegionCacheLastSampleStatus(regionGroupId, nodeId); - } - - // Update RegionGroupCache - loadManager - .getLoadCache() - .cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - // Region will inherit DataNode's status - nextRegionStatus), - false); - - if (((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) - && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) - || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)) - && Boolean.TRUE.equals(isLeader)) { - // Update ConsensusGroupCache when necessary - loadManager - .getLoadCache() - .cacheConsensusSample( - regionGroupId, - new ConsensusGroupHeartbeatSample( - heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); - } - }); + Map<TConsensusGroupId, Boolean> judgedLeaders = + heartbeatResp.isSetJudgedLeaders() + ? heartbeatResp.getJudgedLeaders() + : Collections.emptyMap(); + judgedLeaders.forEach( + (regionGroupId, isLeader) -> { + + // Do not allow regions to inherit the Removing state from datanode + RegionStatus nextRegionStatus = regionStatus; + if (nextRegionStatus == RegionStatus.Removing) { + nextRegionStatus = + loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId); + } + + // Update RegionGroupCache + loadManager + .getLoadCache() + .cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + // Region will inherit DataNode's status + nextRegionStatus), + false); + + boolean shouldCacheConsensusSample = + (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) + && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) + || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE); + long logicalTimestamp = + heartbeatResp.isSetConsensusLogicalTimeMap() + && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId) + ? heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId) + : heartbeatResp.getHeartbeatTimestamp(); + loadManager + .getLoadCache() + .cacheConsensusGroupHeartbeatSample( + regionGroupId, + nodeId, + Boolean.TRUE.equals(isLeader), + logicalTimestamp, + shouldCacheConsensusSample); + }); + loadManager + .getLoadCache() + .cacheUnreportedDataNodeRegionHeartbeatSamples( + nodeId, judgedLeaders.keySet(), heartbeatResp.getHeartbeatTimestamp()); if (heartbeatResp.getRegionDeviceUsageMap() != null) { deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap()); @@ -170,6 +180,7 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<TDataNodeHe if (ThriftClient.isConnectionBroken(e)) { loadManager.forceUpdateNodeCache( NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + loadManager.getLoadCache().cacheDataNodeHeartbeatFailureSample(nodeId, System.nanoTime()); } loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index fe687d17556..2b0dc1610e6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -288,13 +288,34 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); - // Always start load services first - configManager.getLoadManager().startLoadServices(); + // Always start load services first and wait for its first full warm-up before serving. + long loadReadyEpoch = configManager.getLoadManager().startLoadServices(); if (CONF.isEnableTopologyProbing()) { configManager.getLoadManager().startTopologyService(); } + threadPool.submit(() -> startLeaderServicesAfterLoadReady(loadReadyEpoch)); + } + + private void startLeaderServicesAfterLoadReady(long loadReadyEpoch) { + if (!configManager.getLoadManager().waitForLoadReady(loadReadyEpoch)) { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + + "skip starting leader services because load warm-up is interrupted", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + return; + } + if (!configManager.getConsensusManager().isLeaderReady()) { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + + "skip starting leader services because consensus leader is no longer ready", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + return; + } + // Start leader scheduling services configManager.getProcedureManager().startExecutor(); threadPool.submit( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 2db0255e35a..ea2ffc1c2c1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1247,7 +1247,13 @@ public class ConfigManager implements IManager { "ConsensusManager of target-ConfigNode is not initialized, " + "please make sure the target-ConfigNode has been started successfully."); } - return getConsensusManager().confirmLeader(); + TSStatus status = getConsensusManager().confirmLeader(); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + && !getLoadManager().isLoadReady()) { + return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) + .setMessage(getLoadManager().getLoadReadyReason()); + } + return status; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index e97f32bdbda..148a4bc8ae8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -48,10 +48,16 @@ import org.apache.iotdb.confignode.manager.load.service.TopologyService; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.stream.Collectors; /** * The {@link LoadManager} at ConfigNodeGroup-Leader is active. It proactively implements the @@ -59,6 +65,9 @@ import java.util.function.Function; */ public class LoadManager { + private static final long LOAD_READY_CHECK_INTERVAL_MS = + Math.max(10, Math.min(100, StatisticsService.STATISTICS_UPDATE_INTERVAL / 10)); + protected final IManager configManager; /** Balancers. */ @@ -74,6 +83,10 @@ public class LoadManager { private final StatisticsService statisticsService; private final EventService eventService; private final TopologyService topologyService; + private final AtomicBoolean loadServicesStarted; + private final AtomicLong loadReadyEpoch; + private final AtomicBoolean loadReady; + private volatile String loadReadyReason; public LoadManager(IManager configManager) { this.configManager = configManager; @@ -90,6 +103,10 @@ public class LoadManager { this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator()); this.eventService.register(routeBalancer); this.eventService.register(topologyService); + this.loadServicesStarted = new AtomicBoolean(false); + this.loadReadyEpoch = new AtomicLong(0); + this.loadReady = new AtomicBoolean(false); + this.loadReadyReason = "ConfigNode leader load services are not started."; } protected void setHeartbeatService(IManager configManager, LoadCache loadCache) { @@ -146,15 +163,24 @@ public class LoadManager { partitionBalancer.reBalanceDataPartitionPolicy(database); } - public void startLoadServices() { + public long startLoadServices() { + long epoch = loadReadyEpoch.incrementAndGet(); + loadReady.set(false); + loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; loadCache.initHeartbeatCache(configManager); + loadServicesStarted.set(true); heartbeatService.startHeartbeatService(); statisticsService.startLoadStatisticsService(); eventService.startEventService(); partitionBalancer.setupPartitionBalancer(); + return epoch; } public void stopLoadServices() { + loadReadyEpoch.incrementAndGet(); + loadServicesStarted.set(false); + loadReady.set(false); + loadReadyReason = "ConfigNode leader load services are stopped."; heartbeatService.stopHeartbeatService(); statisticsService.stopLoadStatisticsService(); eventService.stopEventService(); @@ -163,6 +189,88 @@ public class LoadManager { routeBalancer.clearRegionPriority(); } + public boolean waitForLoadReady(long epoch) { + while (epoch == loadReadyEpoch.get() && !Thread.currentThread().isInterrupted()) { + if (tryUpdateLoadReady()) { + return true; + } + try { + TimeUnit.MILLISECONDS.sleep(LOAD_READY_CHECK_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + return false; + } + + public boolean isLoadReady() { + return loadReady.get() || tryUpdateLoadReady(); + } + + public String getLoadReadyReason() { + return loadReadyReason; + } + + private synchronized boolean tryUpdateLoadReady() { + if (loadReady.get()) { + return true; + } + if (!loadServicesStarted.get()) { + loadReadyReason = "ConfigNode leader load services are not started."; + return false; + } + + loadCache.updateNodeStatistics(false); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); + eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary(); + eventService.checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary(); + + List<String> unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons(); + if (!unreadyReasons.isEmpty() + && unreadyReasons.stream().anyMatch(reason -> !reason.startsWith("consensusGroups="))) { + loadReadyReason = "ConfigNode leader is warming up LoadCache: " + unreadyReasons; + return false; + } + + routeBalancer.balanceRegionLeaderAndPriority(); + + unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons(); + if (!unreadyReasons.isEmpty()) { + loadReadyReason = "ConfigNode leader is warming up LoadCache: " + unreadyReasons; + return false; + } + + List<TConsensusGroupId> unreadyRegionPriorities = getUnreadyRegionPriorities(); + if (!unreadyRegionPriorities.isEmpty()) { + loadReadyReason = + "ConfigNode leader is warming up region priority: " + + unreadyRegionPriorities.subList(0, Math.min(10, unreadyRegionPriorities.size())) + + (unreadyRegionPriorities.size() > 10 + ? "...(" + (unreadyRegionPriorities.size() - 10) + " more)" + : ""); + return false; + } + + loadReadyReason = "ConfigNode leader load services are ready."; + loadReady.set(true); + return true; + } + + private List<TConsensusGroupId> getUnreadyRegionPriorities() { + List<TConsensusGroupId> regionGroupIds = loadCache.getAllRegionGroupIds(); + if (regionGroupIds.isEmpty()) { + return Collections.emptyList(); + } + Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = + routeBalancer.getRegionPriorityMap(); + return regionGroupIds.stream() + .filter(regionGroupId -> !regionPriorityMap.containsKey(regionGroupId)) + .collect(Collectors.toCollection(ArrayList::new)); + } + public void startTopologyService() { topologyService.startTopologyService(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java index d61a0043520..e5ab6445f98 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java @@ -97,6 +97,10 @@ public abstract class AbstractLoadCache { return slidingWindow.isEmpty() ? null : slidingWindow.get(slidingWindow.size() - 1); } + public boolean hasHeartbeatSample() { + return getLastSample() != null; + } + /** * Update currentStatistics based on the latest heartbeat sample that cached in the slidingWindow. */ diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 818171c89bc..6be880dd085 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCa import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; @@ -82,6 +83,7 @@ public class LoadCache { Math.max( ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2), TimeUnit.SECONDS.toMillis(10)); + private static final int MAX_UNREADY_ENTITY_PRINT = 10; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -98,6 +100,8 @@ public class LoadCache { private final Map<Integer, Map<Integer, Long>> regionRawSizeMap; // Map<RegionGroupId, ConsensusGroupCache> private final Map<TConsensusGroupId, ConsensusGroupCache> consensusGroupCacheMap; + // Map<RegionGroupId, Set<DataNodeId that has reported leader judgment>> + private final Map<TConsensusGroupId, Set<Integer>> consensusGroupHeartbeatSampledNodeMap; // Map<DataNodeId, confirmedConfigNodes> private final Map<Integer, Set<TEndPoint>> confirmedConfigNodeMap; private Map<Integer, Set<Integer>> topologyGraph; @@ -110,6 +114,7 @@ public class LoadCache { this.regionSizeMap = new ConcurrentHashMap<>(); this.regionRawSizeMap = new ConcurrentHashMap<>(); this.consensusGroupCacheMap = new ConcurrentHashMap<>(); + this.consensusGroupHeartbeatSampledNodeMap = new ConcurrentHashMap<>(); this.confirmedConfigNodeMap = new ConcurrentHashMap<>(); this.topologyGraph = new HashMap<>(); this.topologyUpdated = new AtomicBoolean(false); @@ -175,6 +180,7 @@ public class LoadCache { Map<String, List<TRegionReplicaSet>> regionReplicaMap) { regionGroupCacheMap.clear(); consensusGroupCacheMap.clear(); + consensusGroupHeartbeatSampledNodeMap.clear(); regionReplicaMap.forEach( (database, regionReplicaSets) -> regionReplicaSets.forEach( @@ -192,6 +198,8 @@ public class LoadCache { .collect(Collectors.toSet()), isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); + consensusGroupHeartbeatSampledNodeMap.put( + regionGroupId, ConcurrentHashMap.newKeySet()); })); } @@ -200,6 +208,7 @@ public class LoadCache { nodeCacheMap.clear(); regionGroupCacheMap.clear(); consensusGroupCacheMap.clear(); + consensusGroupHeartbeatSampledNodeMap.clear(); } /** @@ -302,6 +311,7 @@ public class LoadCache { regionGroupId, new RegionGroupCache(database, regionGroupId, dataNodeIds, isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); + consensusGroupHeartbeatSampledNodeMap.put(regionGroupId, ConcurrentHashMap.newKeySet()); } /** @@ -364,6 +374,69 @@ public class LoadCache { .ifPresent(group -> group.cacheHeartbeatSample(sample)); } + public void cacheConsensusGroupHeartbeatSample( + TConsensusGroupId regionGroupId, + int nodeId, + boolean isLeader, + long logicalTimestamp, + boolean cacheLeader) { + consensusGroupHeartbeatSampledNodeMap + .computeIfAbsent(regionGroupId, empty -> ConcurrentHashMap.newKeySet()) + .add(nodeId); + if (cacheLeader && isLeader) { + cacheConsensusSample( + regionGroupId, new ConsensusGroupHeartbeatSample(logicalTimestamp, nodeId)); + } else if (isConsensusGroupHeartbeatFullySampled(regionGroupId) + && !Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId)) + .map(AbstractLoadCache::hasHeartbeatSample) + .orElse(false)) { + cacheConsensusSample( + regionGroupId, + new ConsensusGroupHeartbeatSample( + logicalTimestamp, ConsensusGroupCache.UN_READY_LEADER_ID)); + } + } + + private boolean isConsensusGroupHeartbeatFullySampled(TConsensusGroupId regionGroupId) { + return Optional.ofNullable(regionGroupCacheMap.get(regionGroupId)) + .map(RegionGroupCache::getRegionLocations) + .map( + regionLocations -> + consensusGroupHeartbeatSampledNodeMap + .getOrDefault(regionGroupId, Collections.emptySet()) + .containsAll(regionLocations)) + .orElse(false); + } + + public void cacheDataNodeHeartbeatFailureSample(int nodeId, long sampleTimestamp) { + cacheUnreportedDataNodeRegionHeartbeatSamples(nodeId, Collections.emptySet(), sampleTimestamp); + } + + public void cacheUnreportedDataNodeRegionHeartbeatSamples( + int nodeId, Set<TConsensusGroupId> reportedRegionGroupIds, long sampleTimestamp) { + getRegionGroupIdsByDataNodeId(nodeId) + .forEach( + regionGroupId -> { + if (reportedRegionGroupIds.contains(regionGroupId)) { + return; + } + cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample(sampleTimestamp, RegionStatus.Unknown), + false); + cacheConsensusGroupHeartbeatSample( + regionGroupId, nodeId, false, sampleTimestamp, false); + }); + } + + private List<TConsensusGroupId> getRegionGroupIdsByDataNodeId(int nodeId) { + return regionGroupCacheMap.entrySet().stream() + .filter(entry -> entry.getValue().getRegionLocations().contains(nodeId)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + /** Update the NodeStatistics of all Nodes. */ public void updateNodeStatistics(boolean forceUpdate) { nodeCacheMap @@ -450,6 +523,10 @@ public class LoadCache { return regionGroupIdsMap; } + public List<TConsensusGroupId> getAllRegionGroupIds() { + return new ArrayList<>(regionGroupCacheMap.keySet()); + } + /** * Get the RegionGroupStatistics of all RegionGroups. * @@ -496,6 +573,74 @@ public class LoadCache { return consensusGroupStatisticsMap; } + public boolean isLoadWarmUpReady() { + return getLoadWarmUpUnreadyReasons().isEmpty(); + } + + public List<String> getLoadWarmUpUnreadyReasons() { + List<String> unreadyReasons = new ArrayList<>(); + List<Integer> unreadyNodes = new ArrayList<>(); + nodeCacheMap.forEach( + (nodeId, nodeCache) -> { + if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { + return; + } + if (!nodeCache.hasHeartbeatSample() + || nodeCache.getCurrentStatistics().getStatisticsNanoTimestamp() == Long.MIN_VALUE) { + unreadyNodes.add(nodeId); + } + }); + addUnreadyReason(unreadyReasons, "nodes", unreadyNodes); + + List<String> unreadyRegions = new ArrayList<>(); + List<TConsensusGroupId> unreadyRegionGroups = new ArrayList<>(); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + regionGroupCache + .getRegionLocations() + .forEach( + dataNodeId -> { + RegionCache regionCache = regionGroupCache.getRegionCache(dataNodeId); + if (regionCache == null || !regionCache.hasHeartbeatSample()) { + unreadyRegions.add(regionGroupId + "@" + dataNodeId); + } + }); + if (!regionGroupCache + .getCurrentStatistics() + .getRegionStatisticsMap() + .keySet() + .containsAll(regionGroupCache.getRegionLocations())) { + unreadyRegionGroups.add(regionGroupId); + } + }); + addUnreadyReason(unreadyReasons, "regions", unreadyRegions); + addUnreadyReason(unreadyReasons, "regionGroups", unreadyRegionGroups); + + List<TConsensusGroupId> unreadyConsensusGroups = new ArrayList<>(); + consensusGroupCacheMap.forEach( + (consensusGroupId, consensusGroupCache) -> { + if (!consensusGroupCache.hasHeartbeatSample() + || consensusGroupCache.getCurrentStatistics().getStatisticsNanoTimestamp() == 0) { + unreadyConsensusGroups.add(consensusGroupId); + } + }); + addUnreadyReason(unreadyReasons, "consensusGroups", unreadyConsensusGroups); + return unreadyReasons; + } + + private void addUnreadyReason(List<String> reasons, String entityName, List<?> unreadyEntities) { + if (unreadyEntities.isEmpty()) { + return; + } + List<?> entitiesToPrint = + unreadyEntities.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, unreadyEntities.size())); + String suffix = + unreadyEntities.size() > MAX_UNREADY_ENTITY_PRINT + ? "...(" + (unreadyEntities.size() - MAX_UNREADY_ENTITY_PRINT) + " more)" + : ""; + reasons.add(entityName + "=" + entitiesToPrint + suffix); + } + /** * Safely get NodeStatus by NodeId. * @@ -714,6 +859,7 @@ public class LoadCache { public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { regionGroupCacheMap.remove(consensusGroupId); consensusGroupCacheMap.remove(consensusGroupId); + consensusGroupHeartbeatSampledNodeMap.remove(consensusGroupId); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java index aa924dc29b5..7dcacc4fd80 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java @@ -41,7 +41,7 @@ public class ConsensusGroupCache extends AbstractLoadCache { synchronized (slidingWindow) { lastSample = (ConsensusGroupHeartbeatSample) getLastSample(); } - if (lastSample != null && lastSample.getLeaderId() != UN_READY_LEADER_ID) { + if (lastSample != null) { currentStatistics.set( new ConsensusGroupStatistics(System.nanoTime(), lastSample.getLeaderId())); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 5be81256b5c..c9989b5deae 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -300,7 +300,7 @@ public class PartitionManager { assignedSchemaPartition = getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); } catch (final NoAvailableRegionGroupException e) { - status = getConsensusManager().confirmLeader(); + status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // The allocation might fail due to leadership change resp.setStatus(status); @@ -445,7 +445,7 @@ public class PartitionManager { assignedDataPartition = getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap); } catch (DatabaseNotExistsException | NoAvailableRegionGroupException e) { - status = getConsensusManager().confirmLeader(); + status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // The allocation might fail due to leadership change resp.setStatus(status); @@ -543,7 +543,7 @@ public class PartitionManager { } private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { - TSStatus status = getConsensusManager().confirmLeader(); + TSStatus status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Here we check the leadership second time // since the RegionGroup creating process might take some time @@ -1597,6 +1597,16 @@ public class PartitionManager { return regionMaintainer; } + private TSStatus confirmLeader() { + TSStatus status = getConsensusManager().confirmLeader(); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + && !getLoadManager().isLoadReady()) { + return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) + .setMessage(getLoadManager().getLoadReadyReason()); + } + return status; + } + private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index fd62c31ae36..6380fec06f7 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -292,4 +292,66 @@ public class LoadManagerTest { new Pair<>(new ConsensusGroupStatistics(newLeaderId), null), differentConsensusGroupStatisticsMap.get(regionGroupId)); } + + @Test + public void testLoadWarmUpRequiresAllEntitySamples() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 100); + Set<Integer> dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet()); + + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + + Assert.assertFalse(loadCache.isLoadWarmUpReady()); + + dataNodeIds.forEach( + dataNodeId -> + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running))); + loadCache.updateNodeStatistics(false); + loadCache.cacheRegionHeartbeatSample( + regionGroupId, 11, new RegionHeartbeatSample(RegionStatus.Running), false); + loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, 11, true, 1, true); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + + Assert.assertFalse(loadCache.isLoadWarmUpReady()); + Assert.assertTrue(loadCache.getLoadWarmUpUnreadyReasons().toString().contains("regions=")); + + loadCache.cacheRegionHeartbeatSample( + regionGroupId, 12, new RegionHeartbeatSample(RegionStatus.Running), false); + loadCache.updateRegionGroupStatistics(); + + Assert.assertTrue( + loadCache.getLoadWarmUpUnreadyReasons().toString(), loadCache.isLoadWarmUpReady()); + } + + @Test + public void testConsensusGroupWarmUpAcceptsFullySampledWithoutLeader() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101); + Set<Integer> dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet()); + + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + dataNodeIds.forEach( + dataNodeId -> { + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)); + loadCache.cacheRegionHeartbeatSample( + regionGroupId, dataNodeId, new RegionHeartbeatSample(RegionStatus.Running), false); + loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, dataNodeId, false, 1, true); + }); + loadCache.updateNodeStatistics(false); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + + Assert.assertTrue( + loadCache.getLoadWarmUpUnreadyReasons().toString(), loadCache.isLoadWarmUpReady()); + Assert.assertEquals( + ConsensusGroupStatistics.generateDefaultConsensusGroupStatistics(), + loadCache.getCurrentConsensusGroupStatisticsMap().get(regionGroupId)); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 1f0b09f0b86..545ee026871 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -401,6 +401,23 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie } return true; } + if (status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) { + if (!isFirstInitiated) { + logger.info( + "ConfigNode leader {} is warming up before serving DataNode {}, will wait and retry." + + " Reason: {}", + configNode, + config.getAddressAndPort(), + status.getMessage()); + } + try { + Thread.sleep(WAIT_CN_LEADER_ELECTION_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn(DataNodeMiscMessages.UNEXPECTED_INTERRUPTION_CONNECT_CONFIG_NODE_BREAK); + } + return true; + } return false; } finally { isFirstInitiated = false;
