This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch optimize-load-cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac17779b8a12df111512a44186dae8d4d8d347e9 Author: YongzaoDan <[email protected]> AuthorDate: Thu Apr 25 15:43:55 2024 +0800 update framework --- .../iotdb/confignode/manager/load/LoadManager.java | 38 +++-- .../manager/load/balancer/RouteBalancer.java | 4 +- .../router/leader/AbstractLeaderBalancer.java | 18 +-- .../router/leader/GreedyLeaderBalancer.java | 14 +- .../router/leader/MinCostFlowLeaderBalancer.java | 122 +++++++------- .../confignode/manager/load/cache/LoadCache.java | 178 +++++++++++++++------ .../manager/load/cache/region/RegionCache.java | 3 +- .../load/cache/region/RegionGroupCache.java | 30 +++- .../iotdb/confignode/manager/node/NodeManager.java | 7 +- .../procedure/env/ConfigNodeProcedureEnv.java | 38 ++++- .../procedure/env/RegionMaintainHandler.java | 14 +- .../impl/node/AddConfigNodeProcedure.java | 2 +- .../impl/region/AddRegionPeerProcedure.java | 6 +- .../impl/region/CreateRegionGroupsProcedure.java | 9 +- .../impl/region/RemoveRegionPeerProcedure.java | 4 +- .../router/leader/CFDLeaderBalancerTest.java | 34 +++- .../router/leader/GreedyLeaderBalancerTest.java | 30 +++- .../leader/LeaderBalancerComparisonTest.java | 39 +++-- .../manager/load/cache/RegionGroupCacheTest.java | 23 ++- 19 files changed, 413 insertions(+), 200 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index b115aee68aa..9db7ee56607 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -265,9 +265,16 @@ public class LoadManager { eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); } - /** Remove the specified Node's cache. */ + /** + * Remove the NodeHeartbeatCache of the specified Node, update statistics and broadcast statistics + * change event if necessary. + * + * @param nodeId the index of the specified Node + */ public void removeNodeCache(int nodeId) { loadCache.removeNodeCache(nodeId); + loadCache.updateNodeStatistics(); + eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); } /** @@ -324,7 +331,8 @@ public class LoadManager { } /** - * Force update the specified RegionGroups' cache. + * Force update the specified RegionGroups' cache, update statistics and broadcast statistics + * change event if necessary. * * @param heartbeatSampleMap Map<RegionGroupId, Map<DataNodeId, RegionHeartbeatSample>> */ @@ -341,38 +349,48 @@ public class LoadManager { } /** - * Add the cache of the specified Region in the specified RegionGroup. + * Force update the specified Region's cache, update statistics and broadcast statistics change + * event if necessary. * - * @param regionGroupId the specified RegionGroup - * @param dataNodeId the specified DataNode + * @param regionGroupId The specified RegionGroup + * @param dataNodeId The DataNodeId where the specified Region is located + * @param regionStatus The specified RegionStatus */ - public void forceAddRegionCache( + public void forceUpdateRegionCache( TConsensusGroupId regionGroupId, int dataNodeId, RegionStatus regionStatus) { loadCache.cacheRegionHeartbeatSample( regionGroupId, dataNodeId, new RegionHeartbeatSample(System.nanoTime(), regionStatus), - true); + false); loadCache.updateRegionGroupStatistics(); eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary(); } /** - * Remove the cache of the specified Region in the specified RegionGroup. + * Remove the cache of the specified Region in the specified RegionGroup, update statistics and + * broadcast statistics change event if necessary. * * @param regionGroupId the specified RegionGroup * @param dataNodeId the specified DataNode */ - public void forceRemoveRegionCache(TConsensusGroupId regionGroupId, int dataNodeId) { + public void removeRegionCache(TConsensusGroupId regionGroupId, int dataNodeId) { loadCache.removeRegionCache(regionGroupId, dataNodeId); loadCache.updateRegionGroupStatistics(); eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary(); } - /** Remove the specified RegionGroup's cache. */ + /** + * Remove the specified RegionGroup's related cache, update statistics and broadcast statistics + * change event if necessary. + * + * @param consensusGroupId The specified RegionGroup + */ public void removeRegionGroupRelatedCache(TConsensusGroupId consensusGroupId) { loadCache.removeRegionGroupCache(consensusGroupId); routeBalancer.removeRegionPriority(consensusGroupId); + loadCache.updateRegionGroupStatistics(); + eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary(); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 79ca434e1d3..a24098c3b92 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -154,8 +154,8 @@ public class RouteBalancer implements IClusterStatusSubscriber { Map<TConsensusGroupId, Integer> currentLeaderMap = getLoadManager().getRegionLeaderMap(); Map<TConsensusGroupId, Integer> optimalLeaderMap = leaderBalancer.generateOptimalLeaderDistribution( - getPartitionManager().getAllRegionGroupIdMap(regionGroupType), - getPartitionManager().getAllReplicaSetsMap(regionGroupType), + getLoadManager().getLoadCache().getCurrentDatabaseRegionGroupMap(regionGroupType), + getLoadManager().getLoadCache().getCurrentRegionLocationMap(regionGroupType), currentLeaderMap, getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(), getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType)); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java index b82b3a0ca48..32eebf1586d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java @@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.manager.load.balancer.router.leader; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; @@ -28,6 +27,7 @@ import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; public abstract class AbstractLeaderBalancer { @@ -37,8 +37,8 @@ public abstract class AbstractLeaderBalancer { // Map<Database, List<RegionGroup>> protected final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap; - // Map<RegionGroupId, RegionGroup> - protected final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap; + // Map<RegionGroupId, Set<DataNodeId>> + protected final Map<TConsensusGroupId, Set<Integer>> regionLocationMap; // Map<RegionGroupId, leaderId> protected final Map<TConsensusGroupId, Integer> regionLeaderMap; // Map<DataNodeId, NodeStatistics> @@ -48,7 +48,7 @@ public abstract class AbstractLeaderBalancer { protected AbstractLeaderBalancer() { this.databaseRegionGroupMap = new TreeMap<>(); - this.regionReplicaSetMap = new TreeMap<>(); + this.regionLocationMap = new TreeMap<>(); this.regionLeaderMap = new TreeMap<>(); this.dataNodeStatisticsMap = new TreeMap<>(); this.regionStatisticsMap = new TreeMap<>(); @@ -56,12 +56,12 @@ public abstract class AbstractLeaderBalancer { protected void initialize( Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Set<Integer>> regionLocationMap, Map<TConsensusGroupId, Integer> regionLeaderMap, Map<Integer, NodeStatistics> dataNodeStatisticsMap, Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { this.databaseRegionGroupMap.putAll(databaseRegionGroupMap); - this.regionReplicaSetMap.putAll(regionReplicaSetMap); + this.regionLocationMap.putAll(regionLocationMap); this.regionLeaderMap.putAll(regionLeaderMap); this.dataNodeStatisticsMap.putAll(dataNodeStatisticsMap); this.regionStatisticsMap.putAll(regionStatisticsMap); @@ -83,7 +83,7 @@ public abstract class AbstractLeaderBalancer { protected void clear() { this.databaseRegionGroupMap.clear(); - this.regionReplicaSetMap.clear(); + this.regionLocationMap.clear(); this.regionLeaderMap.clear(); this.dataNodeStatisticsMap.clear(); this.regionStatisticsMap.clear(); @@ -93,7 +93,7 @@ public abstract class AbstractLeaderBalancer { * Generate an optimal leader distribution. * * @param databaseRegionGroupMap RegionGroup held by each Database - * @param regionReplicaSetMap All RegionGroups the cluster currently have + * @param regionLocationMap All RegionGroups the cluster currently have * @param regionLeaderMap The current leader distribution of each RegionGroup * @param dataNodeStatisticsMap The current statistics of each DataNode * @param regionStatisticsMap The current statistics of each Region @@ -101,7 +101,7 @@ public abstract class AbstractLeaderBalancer { */ public abstract Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution( Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Set<Integer>> regionLocationMap, Map<TConsensusGroupId, Integer> regionLeaderMap, Map<Integer, NodeStatistics> dataNodeStatisticsMap, Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java index 5a6098b60f9..101ddf35e71 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java @@ -20,13 +20,12 @@ package org.apache.iotdb.confignode.manager.load.balancer.router.leader; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -40,13 +39,13 @@ public class GreedyLeaderBalancer extends AbstractLeaderBalancer { @Override public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution( Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Set<Integer>> regionLocationMap, Map<TConsensusGroupId, Integer> regionLeaderMap, Map<Integer, NodeStatistics> dataNodeStatisticsMap, Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { initialize( databaseRegionGroupMap, - regionReplicaSetMap, + regionLocationMap, regionLeaderMap, dataNodeStatisticsMap, regionStatisticsMap); @@ -57,12 +56,11 @@ public class GreedyLeaderBalancer extends AbstractLeaderBalancer { private Map<TConsensusGroupId, Integer> constructGreedyDistribution() { Map<Integer, Integer> leaderCounter = new TreeMap<>(); - regionReplicaSetMap.forEach( - (regionGroupId, regionGroup) -> { + regionLocationMap.forEach( + (regionGroupId, dataNodeIds) -> { int minCount = Integer.MAX_VALUE, leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1); - for (TDataNodeLocation dataNodeLocation : regionGroup.getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); + for (int dataNodeId : dataNodeIds) { if (isDataNodeAvailable(dataNodeId) && isRegionAvailable(regionGroupId, dataNodeId)) { // Select the DataNode with the minimal leader count as the new leader int count = leaderCounter.getOrDefault(dataNodeId, 0); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java index daf1d15f27c..d348d0e1458 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java @@ -20,8 +20,6 @@ package org.apache.iotdb.confignode.manager.load.balancer.router.leader; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; @@ -31,7 +29,9 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -82,13 +82,13 @@ public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer { @Override public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution( Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Set<Integer>> regionLocationMap, Map<TConsensusGroupId, Integer> regionLeaderMap, Map<Integer, NodeStatistics> dataNodeStatisticsMap, Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { initialize( databaseRegionGroupMap, - regionReplicaSetMap, + regionLocationMap, regionLeaderMap, dataNodeStatisticsMap, regionStatisticsMap); @@ -129,21 +129,22 @@ public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer { List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue(); for (TConsensusGroupId regionGroupId : regionGroupIds) { rNodeMap.put(regionGroupId, maxNode++); - for (TDataNodeLocation dataNodeLocation : - regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (isDataNodeAvailable(dataNodeId)) { - if (!sDNodeMap.get(database).containsKey(dataNodeId)) { - sDNodeMap.get(database).put(dataNodeId, maxNode); - sDNodeReflect.get(database).put(maxNode, dataNodeId); - maxNode += 1; - } - if (!tDNodeMap.containsKey(dataNodeId)) { - tDNodeMap.put(dataNodeId, maxNode); - maxNode += 1; - } - } - } + regionLocationMap + .get(regionGroupId) + .forEach( + dataNodeId -> { + if (isDataNodeAvailable(dataNodeId)) { + if (!sDNodeMap.get(database).containsKey(dataNodeId)) { + sDNodeMap.get(database).put(dataNodeId, maxNode); + sDNodeReflect.get(database).put(maxNode, dataNodeId); + maxNode += 1; + } + if (!tDNodeMap.containsKey(dataNodeId)) { + tDNodeMap.put(dataNodeId, maxNode); + maxNode += 1; + } + } + }); } } @@ -166,17 +167,23 @@ public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer { String database = databaseEntry.getKey(); for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) { int rNode = rNodeMap.get(regionGroupId); - for (TDataNodeLocation dataNodeLocation : - regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (isDataNodeAvailable(dataNodeId) && isRegionAvailable(regionGroupId, dataNodeId)) { - int sDNode = sDNodeMap.get(database).get(dataNodeId); - // Capacity: 1, Cost: 1 if sDNode is the current leader of the rNode, 0 otherwise. - // Therefore, the RegionGroup will keep the leader as constant as possible. - int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == dataNodeId ? 0 : 1; - addAdjacentEdges(rNode, sDNode, 1, cost); - } - } + regionLocationMap + .get(regionGroupId) + .forEach( + dataNodeId -> { + if (isDataNodeAvailable(dataNodeId) + && isRegionAvailable(regionGroupId, dataNodeId)) { + int sDNode = sDNodeMap.get(database).get(dataNodeId); + // Capacity: 1, Cost: 1 if sDNode is the current leader of the rNode, 0 + // otherwise. + // Therefore, the RegionGroup will keep the leader as constant as possible. + int cost = + Objects.equals(regionLeaderMap.getOrDefault(regionGroupId, -1), dataNodeId) + ? 0 + : 1; + addAdjacentEdges(rNode, sDNode, 1, cost); + } + }); } } @@ -187,19 +194,21 @@ public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer { // Map<DataNodeId, leader number> Map<Integer, Integer> leaderCounter = new TreeMap<>(); for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) { - for (TDataNodeLocation dataNodeLocation : - regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (isDataNodeAvailable(dataNodeId)) { - int sDNode = sDNodeMap.get(database).get(dataNodeId); - int tDNode = tDNodeMap.get(dataNodeId); - int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum); - // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode. - // Thus, the leader distribution will be as balance as possible within each Database - // based on the Jensen's-Inequality. - addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount); - } - } + regionLocationMap + .get(regionGroupId) + .forEach( + dataNodeId -> { + if (isDataNodeAvailable(dataNodeId)) { + int sDNode = sDNodeMap.get(database).get(dataNodeId); + int tDNode = tDNodeMap.get(dataNodeId); + int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum); + // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode. + // Thus, the leader distribution will be as balance as possible within each + // Database + // based on the Jensen's-Inequality. + addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount); + } + }); } } @@ -207,19 +216,20 @@ public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer { // Map<DataNodeId, possible maximum leader> // Count the possible maximum number of leader in each DataNode Map<Integer, Integer> maxLeaderCounter = new TreeMap<>(); - for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) { - for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (isDataNodeAvailable(dataNodeId)) { - int tDNode = tDNodeMap.get(dataNodeId); - int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum); - // Cost: x^2 for the x-th edge at the current dNode. - // Thus, the leader distribution will be as balance as possible within the cluster - // Based on the Jensen's-Inequality. - addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount); - } - } - } + regionLocationMap.forEach( + (regionGroupId, dataNodeIds) -> + dataNodeIds.forEach( + dataNodeId -> { + if (isDataNodeAvailable(dataNodeId)) { + int tDNode = tDNodeMap.get(dataNodeId); + int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum); + // Cost: x^2 for the x-th edge at the current dNode. + // Thus, the leader distribution will be as balance as possible within the + // cluster + // Based on the Jensen's-Inequality. + addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount); + } + })); } private void addAdjacentEdges(int fromNode, int destNode, int capacity, int cost) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index be561937b9b..5fa56866b89 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -23,9 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; @@ -46,6 +48,7 @@ import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -94,7 +97,12 @@ public class LoadCache { initNodeHeartbeatCache( configManager.getNodeManager().getRegisteredConfigNodes(), configManager.getNodeManager().getRegisteredDataNodes()); - initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets()); + initRegionGroupHeartbeatCache( + configManager.getClusterSchemaManager().getDatabaseNames().stream() + .collect( + Collectors.toMap( + database -> database, + database -> configManager.getPartitionManager().getAllReplicaSets(database)))); } /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched. */ @@ -111,8 +119,7 @@ public class LoadCache { configNodeLocation -> { int configNodeId = configNodeLocation.getConfigNodeId(); if (configNodeId != CURRENT_NODE_ID) { - nodeCacheMap.put(configNodeId, new ConfigNodeHeartbeatCache(configNodeId)); - heartbeatProcessingMap.put(configNodeId, new AtomicBoolean(false)); + createNodeHeartbeatCache(NodeType.ConfigNode, configNodeId); } }); // Force set itself and never update @@ -125,8 +132,7 @@ public class LoadCache { registeredDataNodes.forEach( dataNodeConfiguration -> { int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId(); - nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId)); - heartbeatProcessingMap.put(dataNodeId, new AtomicBoolean(false)); + createNodeHeartbeatCache(NodeType.DataNode, dataNodeId); }); } @@ -134,15 +140,24 @@ public class LoadCache { * Initialize the regionGroupCacheMap and regionRouteCacheMap when the ConfigNode-Leader is * switched. */ - private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) { + private void initRegionGroupHeartbeatCache( + Map<String, List<TRegionReplicaSet>> regionReplicaMap) { regionGroupCacheMap.clear(); consensusGroupCacheMap.clear(); - regionReplicaSets.forEach( - regionReplicaSet -> { - TConsensusGroupId consensusGroupId = regionReplicaSet.getRegionId(); - regionGroupCacheMap.put(consensusGroupId, new RegionGroupCache()); - consensusGroupCacheMap.put(consensusGroupId, new ConsensusGroupCache()); - }); + regionReplicaMap.forEach( + (database, regionReplicaSets) -> + regionReplicaSets.forEach( + regionReplicaSet -> { + TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId(); + regionGroupCacheMap.put( + regionGroupId, + new RegionGroupCache( + database, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()))); + consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); + })); } public void clearHeartbeatCache() { @@ -164,6 +179,25 @@ public class LoadCache { .getAndSet(true); } + /** + * Create a new NodeHeartbeatCache for the specified Node. + * + * @param nodeType The specified NodeType + * @param nodeId The specified NodeId + */ + public void createNodeHeartbeatCache(NodeType nodeType, int nodeId) { + switch (nodeType) { + case ConfigNode: + nodeCacheMap.put(nodeId, new ConfigNodeHeartbeatCache(nodeId)); + break; + case DataNode: + default: + nodeCacheMap.put(nodeId, new DataNodeHeartbeatCache(nodeId)); + break; + } + heartbeatProcessingMap.put(nodeId, new AtomicBoolean(false)); + } + /** * Cache the latest heartbeat sample of a ConfigNode. * @@ -171,9 +205,10 @@ public class LoadCache { * @param sample the latest heartbeat sample */ public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { - nodeCacheMap - .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId)) - .cacheHeartbeatSample(sample); + if (nodeCacheMap.containsKey(nodeId)) { + // Only cache sample when the corresponding loadCache exists + nodeCacheMap.get(nodeId).cacheHeartbeatSample(sample); + } heartbeatProcessingMap.get(nodeId).set(false); } @@ -184,9 +219,10 @@ public class LoadCache { * @param sample the latest heartbeat sample */ public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { - nodeCacheMap - .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId)) - .cacheHeartbeatSample(sample); + if (nodeCacheMap.containsKey(nodeId)) { + // Only cache sample when the corresponding loadCache exists + nodeCacheMap.get(nodeId).cacheHeartbeatSample(sample); + } heartbeatProcessingMap.get(nodeId).set(false); } @@ -194,6 +230,39 @@ public class LoadCache { heartbeatProcessingMap.get(nodeId).set(false); } + /** + * Remove the NodeHeartbeatCache of the specified Node. + * + * @param nodeId the index of the specified Node + */ + public void removeNodeCache(int nodeId) { + nodeCacheMap.remove(nodeId); + heartbeatProcessingMap.remove(nodeId); + } + + /** + * Create a new RegionGroupCache and a new ConsensusGroupCache for the specified RegionGroup. + * + * @param database the Database where the RegionGroup belonged + * @param regionGroupId the index of the RegionGroup + * @param dataNodeIds the index of the DataNodes where the Regions resided + */ + public void createRegionGroupHeartbeatCache( + String database, TConsensusGroupId regionGroupId, Set<Integer> dataNodeIds) { + regionGroupCacheMap.put(regionGroupId, new RegionGroupCache(database, dataNodeIds)); + consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); + } + + /** + * Create a new RegionCache for the specified Region in the specified RegionGroup. + * + * @param regionGroupId the index of the RegionGroup + * @param dataNodeId the index of the DataNode where the Region resides + */ + public void createRegionCache(TConsensusGroupId regionGroupId, int dataNodeId) { + regionGroupCacheMap.get(regionGroupId).createRegionCache(dataNodeId); + } + /** * Cache the latest heartbeat sample of a RegionGroup. * @@ -206,9 +275,10 @@ public class LoadCache { int nodeId, RegionHeartbeatSample sample, boolean overwrite) { - regionGroupCacheMap - .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache()) - .cacheHeartbeatSample(nodeId, sample, overwrite); + if (regionGroupCacheMap.containsKey(regionGroupId)) { + // Only cache sample when the corresponding loadCache exists + regionGroupCacheMap.get(regionGroupId).cacheHeartbeatSample(nodeId, sample, overwrite); + } } /** @@ -229,9 +299,10 @@ public class LoadCache { */ public void cacheConsensusSample( TConsensusGroupId regionGroupId, ConsensusGroupHeartbeatSample sample) { - consensusGroupCacheMap - .computeIfAbsent(regionGroupId, empty -> new ConsensusGroupCache()) - .cacheHeartbeatSample(sample); + if (consensusGroupCacheMap.containsKey(regionGroupId)) { + // Only cache sample when the corresponding loadCache exists + consensusGroupCacheMap.get(regionGroupId).cacheHeartbeatSample(sample); + } } /** Update the NodeStatistics of all Nodes. */ @@ -278,6 +349,44 @@ public class LoadCache { return dataNodeStatisticsMap; } + /** + * Get a map of cached RegionGroups of all Databases. + * + * @param type SchemaRegion or DataRegion + * @return Map<Database, List<RegionGroupId>> + */ + public Map<String, List<TConsensusGroupId>> getCurrentDatabaseRegionGroupMap( + TConsensusGroupType type) { + Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new TreeMap<>(); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + if (type.equals(regionGroupId.getType())) { + databaseRegionGroupMap + .computeIfAbsent(regionGroupCache.getDatabase(), empty -> new ArrayList<>()) + .add(regionGroupId); + } + }); + return databaseRegionGroupMap; + } + + /** + * Get a map of cached RegionGroups + * + * @param type SchemaRegion or DataRegion + * @return Map<RegionGroupId, Set<DataNodeId>> + */ + public Map<TConsensusGroupId, Set<Integer>> getCurrentRegionLocationMap( + TConsensusGroupType type) { + Map<TConsensusGroupId, Set<Integer>> regionGroupIdsMap = new TreeMap<>(); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + if (type.equals(regionGroupId.getType())) { + regionGroupIdsMap.put(regionGroupId, regionGroupCache.getRegionLocations()); + } + }); + return regionGroupIdsMap; + } + /** * Get the RegionGroupStatistics of all RegionGroups. * @@ -335,22 +444,6 @@ public class LoadCache { return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus(); } - /** - * Get all DataNodes' NodeStatus - * - * @return Map<DataNodeId, NodeStatus> - */ - public Map<Integer, NodeStatus> getDataNodeStatus() { - Map<Integer, NodeStatus> nodeStatusMap = new TreeMap<>(); - nodeCacheMap.forEach( - (nodeId, nodeCache) -> { - if (nodeCache instanceof DataNodeHeartbeatCache) { - nodeStatusMap.put(nodeId, nodeCache.getNodeStatus()); - } - }); - return nodeStatusMap; - } - /** * Safely get the specified Node's current status with reason. * @@ -464,11 +557,6 @@ public class LoadCache { .orElse(-1); } - /** Remove the specified Node's cache. */ - public void removeNodeCache(int nodeId) { - nodeCacheMap.remove(nodeId); - } - /** * Safely get RegionStatus. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java index 2e68d370519..8facf40ba5a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java @@ -58,7 +58,8 @@ public class RegionCache extends AbstractLoadCache { return (RegionStatistics) currentStatistics.get(); } - public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample, boolean overwrite) { + public synchronized void cacheHeartbeatSample( + RegionHeartbeatSample newHeartbeatSample, boolean overwrite) { if (overwrite || getLastSample() == null) { super.cacheHeartbeatSample(newHeartbeatSample); return; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java index ae922dc0028..547e3cea55e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -35,14 +36,17 @@ import java.util.concurrent.atomic.AtomicReference; */ public class RegionGroupCache { + private final String database; // Map<DataNodeId(where a RegionReplica resides in), RegionCache> private final Map<Integer, RegionCache> regionCacheMap; // The current RegionGroupStatistics, used for providing statistics to other services private final AtomicReference<RegionGroupStatistics> currentStatistics; /** Constructor for create RegionGroupCache with default RegionGroupStatistics. */ - public RegionGroupCache() { + public RegionGroupCache(String database, Set<Integer> dataNodeIds) { + this.database = database; this.regionCacheMap = new ConcurrentHashMap<>(); + dataNodeIds.forEach(dataNodeId -> regionCacheMap.put(dataNodeId, new RegionCache())); this.currentStatistics = new AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics()); } @@ -56,9 +60,10 @@ public class RegionGroupCache { */ public void cacheHeartbeatSample( int dataNodeId, RegionHeartbeatSample newHeartbeatSample, boolean overwrite) { - regionCacheMap - .computeIfAbsent(dataNodeId, empty -> new RegionCache()) - .cacheHeartbeatSample(newHeartbeatSample, overwrite); + if (regionCacheMap.containsKey(dataNodeId)) { + // Only cache sample when the corresponding loadCache exists + regionCacheMap.get(dataNodeId).cacheHeartbeatSample(newHeartbeatSample, overwrite); + } } @TestOnly @@ -66,6 +71,15 @@ public class RegionGroupCache { cacheHeartbeatSample(dataNodeId, newHeartbeatSample, false); } + /** + * Create the cache of the specified Region. + * + * @param dataNodeId the specified DataNode + */ + public void createRegionCache(int dataNodeId) { + regionCacheMap.put(dataNodeId, new RegionCache()); + } + /** * Remove the cache of the specified Region in the specified RegionGroup. * @@ -131,4 +145,12 @@ public class RegionGroupCache { public RegionGroupStatistics getCurrentStatistics() { return currentStatistics.get(); } + + public String getDatabase() { + return database; + } + + public Set<Integer> getRegionLocations() { + return regionCacheMap.keySet(); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 5afd20c2965..cedc2f86c9e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.cluster.RegionRoleType; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -271,7 +272,11 @@ public class NodeManager { return resp; } + // Create a new DataNodeHeartbeatCache and force update NodeStatus int dataNodeId = nodeInfo.generateNextNodeId(); + getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId); + // TODO: invoke a force heartbeat to update new DataNode's status immediately + RegisterDataNodePlan registerDataNodePlan = new RegisterDataNodePlan(req.getDataNodeConfiguration()); // Register new DataNode @@ -298,8 +303,6 @@ public class NodeManager { // Adjust the maximum RegionGroup number of each Database getClusterSchemaManager().adjustMaxRegionGroupNum(); - // TODO: Add a force heartbeat to update LoadCache immediately - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION); resp.setDataNodeId( registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 0509b6f0e37..0f5b7f87621 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -366,6 +366,16 @@ public class ConfigNodeProcedureEnv { ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS); } + /** + * Create a new ConfigNodeHeartbeatCache + * + * @param nodeId the index of the new ConfigNode + */ + public void createConfigNodeHeartbeatCache(int nodeId) { + getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.ConfigNode, nodeId); + // TODO: invoke a force heartbeat to update new ConfigNode's status immediately + } + /** * Mark the given datanode as removing status to avoid read or write request routing to this node. * @@ -554,13 +564,33 @@ public class ConfigNodeProcedureEnv { * Force activating RegionGroup by setting status to Running, therefore the ConfigNode-leader can * select leader for it and use it to allocate new Partitions * - * @param activateRegionGroupMap Map<RegionGroupId, Map<DataNodeId, activate heartbeat sample>> + * @param activateRegionGroupMap Map<Database, Map<RegionGroupId, Map<DataNodeId, activate + * heartbeat sample>>> */ public void activateRegionGroup( - Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>> activateRegionGroupMap) { - getLoadManager().forceUpdateRegionGroupCache(activateRegionGroupMap); + Map<String, Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>> + activateRegionGroupMap) { + // Create RegionGroup heartbeat Caches + activateRegionGroupMap.forEach( + (database, regionGroupSampleMap) -> + regionGroupSampleMap.forEach( + (regionGroupId, regionSampleMap) -> + getLoadManager() + .getLoadCache() + .createRegionGroupHeartbeatCache( + database, regionGroupId, regionSampleMap.keySet()))); + // Force update first heartbeat samples + getLoadManager() + .forceUpdateRegionGroupCache( + activateRegionGroupMap.values().stream() + .flatMap(innerMap -> innerMap.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> b))); // Wait for leader and priority redistribution - getLoadManager().waitForRegionGroupReady(new ArrayList<>(activateRegionGroupMap.keySet())); + getLoadManager() + .waitForRegionGroupReady( + activateRegionGroupMap.values().stream() + .flatMap(innterMap -> innterMap.keySet().stream()) + .collect(Collectors.toList())); } public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java index caf6482a090..500391922f0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java @@ -417,8 +417,7 @@ public class RegionMaintainHandler { return report; } - public void addRegionLocation( - TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus regionStatus) { + public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation newLocation) { AddRegionLocationPlan req = new AddRegionLocationPlan(regionId, newLocation); TSStatus status = configManager.getPartitionManager().addRegionLocation(req); LOGGER.info( @@ -428,14 +427,15 @@ public class RegionMaintainHandler { status); configManager .getLoadManager() - .forceAddRegionCache(regionId, newLocation.getDataNodeId(), regionStatus); + .getLoadCache() + .createRegionCache(regionId, newLocation.getDataNodeId()); } - public void updateRegionCache( + public void forceUpdateRegionCache( TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus regionStatus) { configManager .getLoadManager() - .forceAddRegionCache(regionId, newLocation.getDataNodeId(), regionStatus); + .forceUpdateRegionCache(regionId, newLocation.getDataNodeId(), regionStatus); } public void removeRegionLocation( @@ -447,9 +447,7 @@ public class RegionMaintainHandler { regionId, getIdWithRpcEndpoint(deprecatedLocation), status); - configManager - .getLoadManager() - .forceRemoveRegionCache(regionId, deprecatedLocation.getDataNodeId()); + configManager.getLoadManager().removeRegionCache(regionId, deprecatedLocation.getDataNodeId()); configManager.getLoadManager().getRouteBalancer().balanceRegionLeaderAndPriority(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java index 160551b6847..b38696a10ed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java @@ -81,8 +81,8 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS break; case REGISTER_SUCCESS: env.notifyRegisterSuccess(tConfigNodeLocation); + env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId()); env.applyConfigNode(tConfigNodeLocation, versionInfo); - // TODO: Add a force heartbeat to update LoadCache immediately LOG.info("The ConfigNode: {} is successfully added to the cluster", tConfigNodeLocation); return Flow.NO_MORE_STATE; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java index bebf03f6792..99efcf860e5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java @@ -91,7 +91,7 @@ public class AddRegionPeerProcedure getProcId(), consensusGroupId.getId(), destDataNode.getDataNodeId()); - handler.addRegionLocation(consensusGroupId, destDataNode, RegionStatus.Adding); + handler.addRegionLocation(consensusGroupId, destDataNode); TSStatus status = handler.createNewRegionPeer(consensusGroupId, destDataNode); setKillPoint(state); if (status.getCode() != SUCCESS_STATUS.getStatusCode()) { @@ -100,7 +100,7 @@ public class AddRegionPeerProcedure setNextState(AddRegionPeerState.DO_ADD_REGION_PEER); break; case DO_ADD_REGION_PEER: - handler.updateRegionCache(consensusGroupId, destDataNode, RegionStatus.Adding); + handler.forceUpdateRegionCache(consensusGroupId, destDataNode, RegionStatus.Adding); // We don't want to re-submit AddRegionPeerTask when leader change or ConfigNode reboot if (!this.isStateDeserialized()) { TSStatus tsStatus = @@ -140,7 +140,7 @@ public class AddRegionPeerProcedure throw new UnsupportedOperationException(msg); } case UPDATE_REGION_LOCATION_CACHE: - handler.updateRegionCache(consensusGroupId, destDataNode, RegionStatus.Running); + handler.forceUpdateRegionCache(consensusGroupId, destDataNode, RegionStatus.Running); setKillPoint(state); LOGGER.info("[pid{}][AddRegion] state {} complete", getProcId(), state); LOGGER.info( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java index 8498cf49c0a..95ec7bda351 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java @@ -178,8 +178,8 @@ public class CreateRegionGroupsProcedure case ACTIVATE_REGION_GROUPS: long currentTime = System.nanoTime(); // Build RegionGroupCache immediately to make these successfully built RegionGroup available - Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>> activateRegionGroupMap = - new TreeMap<>(); + Map<String, Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>> + activateRegionGroupMap = new TreeMap<>(); createRegionGroupsPlan .getRegionGroupMap() .forEach( @@ -211,8 +211,9 @@ public class CreateRegionGroupsProcedure ? RegionStatus.Unknown : RegionStatus.Running)); }); - activateRegionGroupMap.put( - regionReplicaSet.getRegionId(), activateSampleMap); + activateRegionGroupMap + .computeIfAbsent(database, empty -> new TreeMap<>()) + .put(regionReplicaSet.getRegionId(), activateSampleMap); } })); env.activateRegionGroup(activateRegionGroupMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java index 17fdc903295..5788b79bd83 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java @@ -89,13 +89,12 @@ public class RemoveRegionPeerProcedure getProcId(), consensusGroupId.getId(), targetDataNode.getDataNodeId()); - handler.updateRegionCache(consensusGroupId, targetDataNode, RegionStatus.Removing); + handler.forceUpdateRegionCache(consensusGroupId, targetDataNode, RegionStatus.Removing); handler.transferRegionLeader(consensusGroupId, targetDataNode); KillPoint.setKillPoint(state); setNextState(REMOVE_REGION_PEER); break; case REMOVE_REGION_PEER: - handler.updateRegionCache(consensusGroupId, targetDataNode, RegionStatus.Removing); tsStatus = handler.submitRemoveRegionPeerTask( this.getProcId(), targetDataNode, consensusGroupId, coordinator); @@ -119,7 +118,6 @@ public class RemoveRegionPeerProcedure setNextState(DELETE_OLD_REGION_PEER); break; case DELETE_OLD_REGION_PEER: - handler.updateRegionCache(consensusGroupId, targetDataNode, RegionStatus.Removing); tsStatus = handler.submitDeleteOldRegionPeerTask( this.getProcId(), targetDataNode, consensusGroupId); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java index 7cc1f50c28c..f2f51e044ff 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java @@ -39,9 +39,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class CFDLeaderBalancerTest { @@ -101,10 +103,14 @@ public class CFDLeaderBalancerTest { // Prepare input parameters Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new TreeMap<>(); databaseRegionGroupMap.put(DATABASE, regionGroupIds); - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>(); regionReplicaSets.forEach( regionReplicaSet -> - regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet)); + regionReplicaSetMap.put( + regionReplicaSet.getRegionId(), + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()))); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); regionReplicaSets.forEach( regionReplicaSet -> regionLeaderMap.put(regionReplicaSet.getRegionId(), 0)); @@ -147,8 +153,12 @@ public class CFDLeaderBalancerTest { // Prepare input parameters Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new TreeMap<>(); databaseRegionGroupMap.put(DATABASE, Collections.singletonList(regionReplicaSet.getRegionId())); - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); - regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>(); + regionReplicaSetMap.put( + regionReplicaSet.getRegionId(), + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); regionLeaderMap.put(regionReplicaSet.getRegionId(), 1); Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>(); @@ -190,7 +200,7 @@ public class CFDLeaderBalancerTest { nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running)); // Prepare RegionGroups Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new TreeMap<>(); - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>(); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); for (int i = 0; i < 5; i++) { @@ -204,7 +214,11 @@ public class CFDLeaderBalancerTest { Arrays.asList( new TDataNodeLocation().setDataNodeId(0), new TDataNodeLocation().setDataNodeId(1))); - regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionReplicaSetMap.put( + regionGroupId, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); regionLeaderMap.put(regionGroupId, 0); // Assuming all Regions are migrating from DataNode-1 to DataNode-2 Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); @@ -255,7 +269,7 @@ public class CFDLeaderBalancerTest { Random random = new Random(); Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new TreeMap<>(); databaseRegionGroupMap.put(DATABASE, new ArrayList<>()); - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>(); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); for (int i = 0; i < regionGroupNum; i++) { @@ -273,7 +287,11 @@ public class CFDLeaderBalancerTest { regionStatisticsMap.put(regionGroupId, regionStatistics); databaseRegionGroupMap.get(DATABASE).add(regionGroupId); - regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionReplicaSetMap.put( + regionGroupId, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); regionLeaderMap.put(regionGroupId, leaderId); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java index 6b97e1b385b..b531896d37c 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java @@ -35,9 +35,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class GreedyLeaderBalancerTest { @@ -45,7 +47,7 @@ public class GreedyLeaderBalancerTest { @Test public void optimalLeaderDistributionTest() { - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>(); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>(); Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); @@ -67,7 +69,11 @@ public class GreedyLeaderBalancerTest { regionStatistics.put(j, new RegionStatistics(RegionStatus.Running)); } TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); - regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionReplicaSetMap.put( + regionGroupId, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); regionLeaderMap.put(regionGroupId, random.nextInt(3)); regionStatisticsMap.put(regionGroupId, regionStatistics); } @@ -83,7 +89,11 @@ public class GreedyLeaderBalancerTest { regionStatistics.put(j, new RegionStatistics(RegionStatus.Running)); } TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); - regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionReplicaSetMap.put( + regionGroupId, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3)); regionStatisticsMap.put(regionGroupId, regionStatistics); } @@ -110,7 +120,7 @@ public class GreedyLeaderBalancerTest { @Test public void disableTest() { - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new TreeMap<>(); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>(); Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>(); Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>(); Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap = new TreeMap<>(); @@ -135,7 +145,11 @@ public class GreedyLeaderBalancerTest { j, new RegionStatistics(j == 1 ? RegionStatus.Unknown : RegionStatus.Running)); } TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); - regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionReplicaSetMap.put( + regionGroupId, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); regionLeaderMap.put(regionGroupId, 1); regionStatisticsMap.put(regionGroupId, regionStatistics); } @@ -152,7 +166,11 @@ public class GreedyLeaderBalancerTest { j, new RegionStatistics(j == 4 ? RegionStatus.ReadOnly : RegionStatus.Running)); } TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(regionGroupId, dataNodeLocations); - regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionReplicaSetMap.put( + regionGroupId, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); regionLeaderMap.put(regionGroupId, 4); regionStatisticsMap.put(regionGroupId, regionStatistics); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java index fca962a2e56..885c97989da 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class LeaderBalancerComparisonTest { @@ -81,7 +82,7 @@ public class LeaderBalancerComparisonTest { // Simulate each DataNode has 16 CPU cores // and each RegionGroup has 3 replicas int regionGroupNum = TEST_CPU_CORE_NUM * dataNodeNum / TEST_REPLICA_NUM; - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>(); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new HashMap<>(); Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>(); generateTestData(dataNodeNum, regionGroupNum, regionReplicaSetMap, regionLeaderMap); @@ -101,13 +102,9 @@ public class LeaderBalancerComparisonTest { regionReplicaSetMap.forEach( (regionGroupId, regionReplicaSet) -> { Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); - regionReplicaSet - .getDataNodeLocations() - .forEach( - dataNodeLocation -> - regionStatistics.put( - dataNodeLocation.getDataNodeId(), - new RegionStatistics(RegionStatus.Running))); + regionReplicaSet.forEach( + dataNodeId -> + regionStatistics.put(dataNodeId, new RegionStatistics(RegionStatus.Running))); allRunningRegionStatistics.put(regionGroupId, regionStatistics); }); Statistics greedyStatistics = @@ -163,15 +160,13 @@ public class LeaderBalancerComparisonTest { regionReplicaSetMap.forEach( (regionGroupId, regionReplicaSet) -> { Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); - regionReplicaSet - .getDataNodeLocations() - .forEach( - dataNodeLocation -> - regionStatistics.put( - dataNodeLocation.getDataNodeId(), - disabledDataNodeSet.contains(dataNodeLocation.getDataNodeId()) - ? new RegionStatistics(RegionStatus.Unknown) - : new RegionStatistics(RegionStatus.Running))); + regionReplicaSet.forEach( + dataNodeId -> + regionStatistics.put( + dataNodeId, + disabledDataNodeSet.contains(dataNodeId) + ? new RegionStatistics(RegionStatus.Unknown) + : new RegionStatistics(RegionStatus.Running))); disabledRegionStatistics.put(regionGroupId, regionStatistics); }); greedyStatistics = @@ -238,7 +233,7 @@ public class LeaderBalancerComparisonTest { private void generateTestData( int dataNodeNum, int regionGroupNum, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap, Map<TConsensusGroupId, Integer> regionLeaderMap) { Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>(); @@ -292,7 +287,11 @@ public class LeaderBalancerComparisonTest { randomNum -= 1; } - regionReplicaSetMap.put(regionGroupId, regionReplicaSet); + regionReplicaSetMap.put( + regionGroupId, + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet())); regionReplicaSet .getDataNodeLocations() .forEach( @@ -307,7 +306,7 @@ public class LeaderBalancerComparisonTest { int dataNodeNum, int regionGroupNum, AbstractLeaderBalancer leaderBalancer, - Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap, Map<TConsensusGroupId, Integer> regionLeaderMap, Map<Integer, NodeStatistics> nodeStatisticsMap, Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap, diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java index a961eb310e7..e340c347e11 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java @@ -26,12 +26,18 @@ import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.junit.Assert; import org.junit.Test; +import java.util.stream.Collectors; +import java.util.stream.Stream; + public class RegionGroupCacheTest { + private static final String DATABASE = "root.db"; + @Test public void getRegionStatusTest() { long currentTime = System.nanoTime(); - RegionGroupCache regionGroupCache = new RegionGroupCache(); + RegionGroupCache regionGroupCache = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2, 3).collect(Collectors.toSet())); regionGroupCache.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); regionGroupCache.cacheHeartbeatSample( @@ -55,7 +61,8 @@ public class RegionGroupCacheTest { @Test public void getRegionGroupStatusTest() { long currentTime = System.nanoTime(); - RegionGroupCache runningRegionGroup = new RegionGroupCache(); + RegionGroupCache runningRegionGroup = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); runningRegionGroup.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); runningRegionGroup.cacheHeartbeatSample( @@ -67,7 +74,8 @@ public class RegionGroupCacheTest { RegionGroupStatus.Running, runningRegionGroup.getCurrentStatistics().getRegionGroupStatus()); - RegionGroupCache availableRegionGroup = new RegionGroupCache(); + RegionGroupCache availableRegionGroup = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); availableRegionGroup.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); availableRegionGroup.cacheHeartbeatSample( @@ -79,7 +87,8 @@ public class RegionGroupCacheTest { RegionGroupStatus.Available, availableRegionGroup.getCurrentStatistics().getRegionGroupStatus()); - RegionGroupCache disabledRegionGroup0 = new RegionGroupCache(); + RegionGroupCache disabledRegionGroup0 = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); disabledRegionGroup0.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); disabledRegionGroup0.cacheHeartbeatSample( @@ -91,7 +100,8 @@ public class RegionGroupCacheTest { RegionGroupStatus.Discouraged, disabledRegionGroup0.getCurrentStatistics().getRegionGroupStatus()); - RegionGroupCache disabledRegionGroup1 = new RegionGroupCache(); + RegionGroupCache disabledRegionGroup1 = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); disabledRegionGroup1.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); disabledRegionGroup1.cacheHeartbeatSample( @@ -103,7 +113,8 @@ public class RegionGroupCacheTest { RegionGroupStatus.Disabled, disabledRegionGroup1.getCurrentStatistics().getRegionGroupStatus()); - RegionGroupCache disabledRegionGroup2 = new RegionGroupCache(); + RegionGroupCache disabledRegionGroup2 = + new RegionGroupCache(DATABASE, Stream.of(0, 1, 2).collect(Collectors.toSet())); disabledRegionGroup2.cacheHeartbeatSample( 0, new RegionHeartbeatSample(currentTime, RegionStatus.Running)); disabledRegionGroup2.cacheHeartbeatSample(
