This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 85f47e09d2e Normalize LoadCache interfaces (#12421)
85f47e09d2e is described below
commit 85f47e09d2e73cb1aeda9973728a247743ce4548
Author: Yongzao <[email protected]>
AuthorDate: Sun Apr 28 20:15:16 2024 +0800
Normalize LoadCache interfaces (#12421)
* update framework
* add LoadManager UT
* spotless
* broadcast after updation
* fix
Signed-off-by: OneSizeFitQuorum <[email protected]>
---------
Signed-off-by: OneSizeFitQuorum <[email protected]>
Co-authored-by: OneSizeFitQuorum <[email protected]>
---
.../iotdb/confignode/manager/load/LoadManager.java | 46 +++-
.../manager/load/balancer/RouteBalancer.java | 4 +-
.../router/leader/AbstractLeaderBalancer.java | 18 +-
.../router/leader/GreedyLeaderBalancer.java | 14 +-
.../router/leader/MinCostFlowLeaderBalancer.java | 122 +++++----
.../confignode/manager/load/cache/LoadCache.java | 178 +++++++++----
.../consensus/ConsensusGroupHeartbeatSample.java | 7 +
.../cache/consensus/ConsensusGroupStatistics.java | 7 +
.../manager/load/cache/node/NodeStatistics.java | 4 +-
.../manager/load/cache/region/RegionCache.java | 3 +-
.../load/cache/region/RegionGroupCache.java | 30 ++-
.../load/cache/region/RegionHeartbeatSample.java | 7 +
.../manager/load/service/EventService.java | 133 ++++++----
.../iotdb/confignode/manager/node/NodeManager.java | 7 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 38 ++-
.../procedure/env/RegionMaintainHandler.java | 14 +-
.../impl/node/AddConfigNodeProcedure.java | 2 +-
.../impl/region/AddRegionPeerProcedure.java | 7 +-
.../impl/region/CreateRegionGroupsProcedure.java | 9 +-
.../impl/region/RemoveRegionPeerProcedure.java | 6 +-
.../confignode/manager/load/FakeSubscriber.java | 92 +++++++
.../confignode/manager/load/LoadManagerTest.java | 295 +++++++++++++++++++++
.../router/leader/CFDLeaderBalancerTest.java | 34 ++-
.../router/leader/GreedyLeaderBalancerTest.java | 30 ++-
.../leader/LeaderBalancerComparisonTest.java | 39 ++-
.../manager/load/cache/RegionGroupCacheTest.java | 23 +-
26 files changed, 911 insertions(+), 258 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b115aee68aa..551a1f4ea08 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
@@ -265,9 +266,16 @@ public class LoadManager {
eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}
- /** Remove the specified Node's cache. */
+ /**
+ * Remove the NodeHeartbeatCache of the specified Node, update statistics
and broadcast statistics
+ * change event if necessary.
+ *
+ * @param nodeId the index of the specified Node
+ */
public void removeNodeCache(int nodeId) {
loadCache.removeNodeCache(nodeId);
+ loadCache.updateNodeStatistics();
+ eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}
/**
@@ -324,7 +332,8 @@ public class LoadManager {
}
/**
- * Force update the specified RegionGroups' cache.
+ * Force update the specified RegionGroups' cache, update statistics and
broadcast statistics
+ * change event if necessary.
*
* @param heartbeatSampleMap Map<RegionGroupId, Map<DataNodeId,
RegionHeartbeatSample>>
*/
@@ -335,18 +344,20 @@ public class LoadManager {
regionHeartbeatSampleMap.forEach(
(dataNodeId, regionHeartbeatSample) ->
loadCache.cacheRegionHeartbeatSample(
- regionGroupId, dataNodeId, regionHeartbeatSample,
false)));
+ regionGroupId, dataNodeId, regionHeartbeatSample,
true)));
loadCache.updateRegionGroupStatistics();
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
}
/**
- * Add the cache of the specified Region in the specified RegionGroup.
+ * Force update the specified Region's cache, update statistics and
broadcast statistics change
+ * event if necessary.
*
- * @param regionGroupId the specified RegionGroup
- * @param dataNodeId the specified DataNode
+ * @param regionGroupId The specified RegionGroup
+ * @param dataNodeId The DataNodeId where the specified Region is located
+ * @param regionStatus The specified RegionStatus
*/
- public void forceAddRegionCache(
+ public void forceUpdateRegionCache(
TConsensusGroupId regionGroupId, int dataNodeId, RegionStatus
regionStatus) {
loadCache.cacheRegionHeartbeatSample(
regionGroupId,
@@ -358,21 +369,31 @@ public class LoadManager {
}
/**
- * Remove the cache of the specified Region in the specified RegionGroup.
+ * Remove the cache of the specified Region in the specified RegionGroup,
update statistics and
+ * broadcast statistics change event if necessary.
*
* @param regionGroupId the specified RegionGroup
* @param dataNodeId the specified DataNode
*/
- public void forceRemoveRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
+ public void removeRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
loadCache.removeRegionCache(regionGroupId, dataNodeId);
loadCache.updateRegionGroupStatistics();
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
}
- /** Remove the specified RegionGroup's cache. */
+ /**
+ * Remove the specified RegionGroup's related cache, update statistics and
broadcast statistics
+ * change event if necessary.
+ *
+ * @param consensusGroupId The specified RegionGroup
+ */
public void removeRegionGroupRelatedCache(TConsensusGroupId
consensusGroupId) {
loadCache.removeRegionGroupCache(consensusGroupId);
routeBalancer.removeRegionPriority(consensusGroupId);
+ loadCache.updateRegionGroupStatistics();
+ loadCache.updateConsensusGroupStatistics();
+
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
+
eventService.checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary();
}
/**
@@ -441,4 +462,9 @@ public class LoadManager {
public RouteBalancer getRouteBalancer() {
return routeBalancer;
}
+
+ @TestOnly
+ public EventService getEventService() {
+ return eventService;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 79ca434e1d3..a24098c3b92 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -154,8 +154,8 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
Map<TConsensusGroupId, Integer> currentLeaderMap =
getLoadManager().getRegionLeaderMap();
Map<TConsensusGroupId, Integer> optimalLeaderMap =
leaderBalancer.generateOptimalLeaderDistribution(
- getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
- getPartitionManager().getAllReplicaSetsMap(regionGroupType),
+
getLoadManager().getLoadCache().getCurrentDatabaseRegionGroupMap(regionGroupType),
+
getLoadManager().getLoadCache().getCurrentRegionLocationMap(regionGroupType),
currentLeaderMap,
getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(),
getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
index b82b3a0ca48..32eebf1586d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
@@ -28,6 +27,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
public abstract class AbstractLeaderBalancer {
@@ -37,8 +37,8 @@ public abstract class AbstractLeaderBalancer {
// Map<Database, List<RegionGroup>>
protected final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
- // Map<RegionGroupId, RegionGroup>
- protected final Map<TConsensusGroupId, TRegionReplicaSet>
regionReplicaSetMap;
+ // Map<RegionGroupId, Set<DataNodeId>>
+ protected final Map<TConsensusGroupId, Set<Integer>> regionLocationMap;
// Map<RegionGroupId, leaderId>
protected final Map<TConsensusGroupId, Integer> regionLeaderMap;
// Map<DataNodeId, NodeStatistics>
@@ -48,7 +48,7 @@ public abstract class AbstractLeaderBalancer {
protected AbstractLeaderBalancer() {
this.databaseRegionGroupMap = new TreeMap<>();
- this.regionReplicaSetMap = new TreeMap<>();
+ this.regionLocationMap = new TreeMap<>();
this.regionLeaderMap = new TreeMap<>();
this.dataNodeStatisticsMap = new TreeMap<>();
this.regionStatisticsMap = new TreeMap<>();
@@ -56,12 +56,12 @@ public abstract class AbstractLeaderBalancer {
protected void initialize(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
- this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+ this.regionLocationMap.putAll(regionLocationMap);
this.regionLeaderMap.putAll(regionLeaderMap);
this.dataNodeStatisticsMap.putAll(dataNodeStatisticsMap);
this.regionStatisticsMap.putAll(regionStatisticsMap);
@@ -83,7 +83,7 @@ public abstract class AbstractLeaderBalancer {
protected void clear() {
this.databaseRegionGroupMap.clear();
- this.regionReplicaSetMap.clear();
+ this.regionLocationMap.clear();
this.regionLeaderMap.clear();
this.dataNodeStatisticsMap.clear();
this.regionStatisticsMap.clear();
@@ -93,7 +93,7 @@ public abstract class AbstractLeaderBalancer {
* Generate an optimal leader distribution.
*
* @param databaseRegionGroupMap RegionGroup held by each Database
- * @param regionReplicaSetMap All RegionGroups the cluster currently have
+ * @param regionLocationMap All RegionGroups the cluster currently have
* @param regionLeaderMap The current leader distribution of each RegionGroup
* @param dataNodeStatisticsMap The current statistics of each DataNode
* @param regionStatisticsMap The current statistics of each Region
@@ -101,7 +101,7 @@ public abstract class AbstractLeaderBalancer {
*/
public abstract Map<TConsensusGroupId, Integer>
generateOptimalLeaderDistribution(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 5a6098b60f9..101ddf35e71 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -20,13 +20,12 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,13 +39,13 @@ public class GreedyLeaderBalancer extends
AbstractLeaderBalancer {
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
initialize(
databaseRegionGroupMap,
- regionReplicaSetMap,
+ regionLocationMap,
regionLeaderMap,
dataNodeStatisticsMap,
regionStatisticsMap);
@@ -57,12 +56,11 @@ public class GreedyLeaderBalancer extends
AbstractLeaderBalancer {
private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
Map<Integer, Integer> leaderCounter = new TreeMap<>();
- regionReplicaSetMap.forEach(
- (regionGroupId, regionGroup) -> {
+ regionLocationMap.forEach(
+ (regionGroupId, dataNodeIds) -> {
int minCount = Integer.MAX_VALUE,
leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1);
- for (TDataNodeLocation dataNodeLocation :
regionGroup.getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
+ for (int dataNodeId : dataNodeIds) {
if (isDataNodeAvailable(dataNodeId) &&
isRegionAvailable(regionGroupId, dataNodeId)) {
// Select the DataNode with the minimal leader count as the new
leader
int count = leaderCounter.getOrDefault(dataNodeId, 0);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index daf1d15f27c..d348d0e1458 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -20,8 +20,6 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
@@ -31,7 +29,9 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -82,13 +82,13 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
initialize(
databaseRegionGroupMap,
- regionReplicaSetMap,
+ regionLocationMap,
regionLeaderMap,
dataNodeStatisticsMap,
regionStatisticsMap);
@@ -129,21 +129,22 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue();
for (TConsensusGroupId regionGroupId : regionGroupIds) {
rNodeMap.put(regionGroupId, maxNode++);
- for (TDataNodeLocation dataNodeLocation :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (isDataNodeAvailable(dataNodeId)) {
- if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
- sDNodeMap.get(database).put(dataNodeId, maxNode);
- sDNodeReflect.get(database).put(maxNode, dataNodeId);
- maxNode += 1;
- }
- if (!tDNodeMap.containsKey(dataNodeId)) {
- tDNodeMap.put(dataNodeId, maxNode);
- maxNode += 1;
- }
- }
- }
+ regionLocationMap
+ .get(regionGroupId)
+ .forEach(
+ dataNodeId -> {
+ if (isDataNodeAvailable(dataNodeId)) {
+ if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
+ sDNodeMap.get(database).put(dataNodeId, maxNode);
+ sDNodeReflect.get(database).put(maxNode, dataNodeId);
+ maxNode += 1;
+ }
+ if (!tDNodeMap.containsKey(dataNodeId)) {
+ tDNodeMap.put(dataNodeId, maxNode);
+ maxNode += 1;
+ }
+ }
+ });
}
}
@@ -166,17 +167,23 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
String database = databaseEntry.getKey();
for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
int rNode = rNodeMap.get(regionGroupId);
- for (TDataNodeLocation dataNodeLocation :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (isDataNodeAvailable(dataNodeId) &&
isRegionAvailable(regionGroupId, dataNodeId)) {
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
- // Therefore, the RegionGroup will keep the leader as constant as
possible.
- int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
- addAdjacentEdges(rNode, sDNode, 1, cost);
- }
- }
+ regionLocationMap
+ .get(regionGroupId)
+ .forEach(
+ dataNodeId -> {
+ if (isDataNodeAvailable(dataNodeId)
+ && isRegionAvailable(regionGroupId, dataNodeId)) {
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ // Capacity: 1, Cost: 1 if sDNode is the current leader of
the rNode, 0
+ // otherwise.
+ // Therefore, the RegionGroup will keep the leader as
constant as possible.
+ int cost =
+
Objects.equals(regionLeaderMap.getOrDefault(regionGroupId, -1), dataNodeId)
+ ? 0
+ : 1;
+ addAdjacentEdges(rNode, sDNode, 1, cost);
+ }
+ });
}
}
@@ -187,19 +194,21 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
// Map<DataNodeId, leader number>
Map<Integer, Integer> leaderCounter = new TreeMap<>();
for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
- for (TDataNodeLocation dataNodeLocation :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (isDataNodeAvailable(dataNodeId)) {
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
- // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
- // Thus, the leader distribution will be as balance as possible
within each Database
- // based on the Jensen's-Inequality.
- addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
- }
- }
+ regionLocationMap
+ .get(regionGroupId)
+ .forEach(
+ dataNodeId -> {
+ if (isDataNodeAvailable(dataNodeId)) {
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = leaderCounter.merge(dataNodeId, 1,
Integer::sum);
+ // Capacity: 1, Cost: x^2 for the x-th edge at the current
sDNode.
+ // Thus, the leader distribution will be as balance as
possible within each
+ // Database
+ // based on the Jensen's-Inequality.
+ addAdjacentEdges(sDNode, tDNode, 1, leaderCount *
leaderCount);
+ }
+ });
}
}
@@ -207,19 +216,20 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
// Map<DataNodeId, possible maximum leader>
// Count the possible maximum number of leader in each DataNode
Map<Integer, Integer> maxLeaderCounter = new TreeMap<>();
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (isDataNodeAvailable(dataNodeId)) {
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = maxLeaderCounter.merge(dataNodeId, 1,
Integer::sum);
- // Cost: x^2 for the x-th edge at the current dNode.
- // Thus, the leader distribution will be as balance as possible
within the cluster
- // Based on the Jensen's-Inequality.
- addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
- }
- }
- }
+ regionLocationMap.forEach(
+ (regionGroupId, dataNodeIds) ->
+ dataNodeIds.forEach(
+ dataNodeId -> {
+ if (isDataNodeAvailable(dataNodeId)) {
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = maxLeaderCounter.merge(dataNodeId, 1,
Integer::sum);
+ // Cost: x^2 for the x-th edge at the current dNode.
+ // Thus, the leader distribution will be as balance as
possible within the
+ // cluster
+ // Based on the Jensen's-Inequality.
+ addAdjacentEdges(tDNode, T_NODE, 1, leaderCount *
leaderCount);
+ }
+ }));
}
private void addAdjacentEdges(int fromNode, int destNode, int capacity, int
cost) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 1ad90a2b02a..78723d3873d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -23,9 +23,11 @@ import
org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
@@ -46,6 +48,7 @@ import
org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -95,7 +98,12 @@ public class LoadCache {
initNodeHeartbeatCache(
configManager.getNodeManager().getRegisteredConfigNodes(),
configManager.getNodeManager().getRegisteredDataNodes());
-
initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+ initRegionGroupHeartbeatCache(
+ configManager.getClusterSchemaManager().getDatabaseNames().stream()
+ .collect(
+ Collectors.toMap(
+ database -> database,
+ database ->
configManager.getPartitionManager().getAllReplicaSets(database))));
}
/** Initialize the nodeCacheMap when the ConfigNode-Leader is switched. */
@@ -112,8 +120,7 @@ public class LoadCache {
configNodeLocation -> {
int configNodeId = configNodeLocation.getConfigNodeId();
if (configNodeId != CURRENT_NODE_ID) {
- nodeCacheMap.put(configNodeId, new
ConfigNodeHeartbeatCache(configNodeId));
- heartbeatProcessingMap.put(configNodeId, new AtomicBoolean(false));
+ createNodeHeartbeatCache(NodeType.ConfigNode, configNodeId);
}
});
// Force set itself and never update
@@ -126,8 +133,7 @@ public class LoadCache {
registeredDataNodes.forEach(
dataNodeConfiguration -> {
int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
- nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId));
- heartbeatProcessingMap.put(dataNodeId, new AtomicBoolean(false));
+ createNodeHeartbeatCache(NodeType.DataNode, dataNodeId);
});
}
@@ -135,15 +141,24 @@ public class LoadCache {
* Initialize the regionGroupCacheMap and regionRouteCacheMap when the
ConfigNode-Leader is
* switched.
*/
- private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet>
regionReplicaSets) {
+ private void initRegionGroupHeartbeatCache(
+ Map<String, List<TRegionReplicaSet>> regionReplicaMap) {
regionGroupCacheMap.clear();
consensusGroupCacheMap.clear();
- regionReplicaSets.forEach(
- regionReplicaSet -> {
- TConsensusGroupId consensusGroupId = regionReplicaSet.getRegionId();
- regionGroupCacheMap.put(consensusGroupId, new RegionGroupCache());
- consensusGroupCacheMap.put(consensusGroupId, new
ConsensusGroupCache());
- });
+ regionReplicaMap.forEach(
+ (database, regionReplicaSets) ->
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ TConsensusGroupId regionGroupId =
regionReplicaSet.getRegionId();
+ regionGroupCacheMap.put(
+ regionGroupId,
+ new RegionGroupCache(
+ database,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet())));
+ consensusGroupCacheMap.put(regionGroupId, new
ConsensusGroupCache());
+ }));
}
public void clearHeartbeatCache() {
@@ -165,6 +180,25 @@ public class LoadCache {
.getAndSet(true);
}
+ /**
+ * Create a new NodeHeartbeatCache for the specified Node.
+ *
+ * @param nodeType The specified NodeType
+ * @param nodeId The specified NodeId
+ */
+ public void createNodeHeartbeatCache(NodeType nodeType, int nodeId) {
+ switch (nodeType) {
+ case ConfigNode:
+ nodeCacheMap.put(nodeId, new ConfigNodeHeartbeatCache(nodeId));
+ break;
+ case DataNode:
+ default:
+ nodeCacheMap.put(nodeId, new DataNodeHeartbeatCache(nodeId));
+ break;
+ }
+ heartbeatProcessingMap.put(nodeId, new AtomicBoolean(false));
+ }
+
/**
* Cache the latest heartbeat sample of a ConfigNode.
*
@@ -172,10 +206,10 @@ public class LoadCache {
* @param sample the latest heartbeat sample
*/
public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample
sample) {
- nodeCacheMap
- .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
- .cacheHeartbeatSample(sample);
- heartbeatProcessingMap.get(nodeId).set(false);
+ // Only cache sample when the corresponding loadCache exists
+ Optional.ofNullable(nodeCacheMap.get(nodeId))
+ .ifPresent(node -> node.cacheHeartbeatSample(sample));
+ Optional.ofNullable(heartbeatProcessingMap.get(nodeId)).ifPresent(node ->
node.set(false));
}
/**
@@ -185,16 +219,49 @@ public class LoadCache {
* @param sample the latest heartbeat sample
*/
public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample
sample) {
- nodeCacheMap
- .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
- .cacheHeartbeatSample(sample);
- heartbeatProcessingMap.get(nodeId).set(false);
+ // Only cache sample when the corresponding loadCache exists
+ Optional.ofNullable(nodeCacheMap.get(nodeId))
+ .ifPresent(node -> node.cacheHeartbeatSample(sample));
+ Optional.ofNullable(heartbeatProcessingMap.get(nodeId)).ifPresent(node ->
node.set(false));
}
public void resetHeartbeatProcessing(int nodeId) {
heartbeatProcessingMap.get(nodeId).set(false);
}
+ /**
+ * Remove the NodeHeartbeatCache of the specified Node.
+ *
+ * @param nodeId the index of the specified Node
+ */
+ public void removeNodeCache(int nodeId) {
+ nodeCacheMap.remove(nodeId);
+ heartbeatProcessingMap.remove(nodeId);
+ }
+
+ /**
+ * Create a new RegionGroupCache and a new ConsensusGroupCache for the
specified RegionGroup.
+ *
+ * @param database the Database where the RegionGroup belonged
+ * @param regionGroupId the index of the RegionGroup
+ * @param dataNodeIds the index of the DataNodes where the Regions resided
+ */
+ public void createRegionGroupHeartbeatCache(
+ String database, TConsensusGroupId regionGroupId, Set<Integer>
dataNodeIds) {
+ regionGroupCacheMap.put(regionGroupId, new RegionGroupCache(database,
dataNodeIds));
+ consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache());
+ }
+
+ /**
+ * Create a new RegionCache for the specified Region in the specified
RegionGroup.
+ *
+ * @param regionGroupId the index of the RegionGroup
+ * @param dataNodeId the index of the DataNode where the Region resides
+ */
+ public void createRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
+ regionGroupCacheMap.get(regionGroupId).createRegionCache(dataNodeId);
+ }
+
/**
* Cache the latest heartbeat sample of a RegionGroup.
*
@@ -207,9 +274,9 @@ public class LoadCache {
int nodeId,
RegionHeartbeatSample sample,
boolean overwrite) {
- regionGroupCacheMap
- .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache())
- .cacheHeartbeatSample(nodeId, sample, overwrite);
+ // Only cache sample when the corresponding loadCache exists
+ Optional.ofNullable(regionGroupCacheMap.get(regionGroupId))
+ .ifPresent(group -> group.cacheHeartbeatSample(nodeId, sample,
overwrite));
}
/**
@@ -231,9 +298,9 @@ public class LoadCache {
*/
public void cacheConsensusSample(
TConsensusGroupId regionGroupId, ConsensusGroupHeartbeatSample sample) {
- consensusGroupCacheMap
- .computeIfAbsent(regionGroupId, empty -> new ConsensusGroupCache())
- .cacheHeartbeatSample(sample);
+ // Only cache sample when the corresponding loadCache exists
+ Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId))
+ .ifPresent(group -> group.cacheHeartbeatSample(sample));
}
/** Update the NodeStatistics of all Nodes. */
@@ -280,6 +347,44 @@ public class LoadCache {
return dataNodeStatisticsMap;
}
+ /**
+ * Get a map of cached RegionGroups of all Databases.
+ *
+ * @param type SchemaRegion or DataRegion
+ * @return Map<Database, List<RegionGroupId>>
+ */
+ public Map<String, List<TConsensusGroupId>> getCurrentDatabaseRegionGroupMap(
+ TConsensusGroupType type) {
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ regionGroupCacheMap.forEach(
+ (regionGroupId, regionGroupCache) -> {
+ if (type.equals(regionGroupId.getType())) {
+ databaseRegionGroupMap
+ .computeIfAbsent(regionGroupCache.getDatabase(), empty -> new
ArrayList<>())
+ .add(regionGroupId);
+ }
+ });
+ return databaseRegionGroupMap;
+ }
+
+ /**
+ * Get a map of cached RegionGroups
+ *
+ * @param type SchemaRegion or DataRegion
+ * @return Map<RegionGroupId, Set<DataNodeId>>
+ */
+ public Map<TConsensusGroupId, Set<Integer>> getCurrentRegionLocationMap(
+ TConsensusGroupType type) {
+ Map<TConsensusGroupId, Set<Integer>> regionGroupIdsMap = new TreeMap<>();
+ regionGroupCacheMap.forEach(
+ (regionGroupId, regionGroupCache) -> {
+ if (type.equals(regionGroupId.getType())) {
+ regionGroupIdsMap.put(regionGroupId,
regionGroupCache.getRegionLocations());
+ }
+ });
+ return regionGroupIdsMap;
+ }
+
/**
* Get the RegionGroupStatistics of all RegionGroups.
*
@@ -337,22 +442,6 @@ public class LoadCache {
return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
}
- /**
- * Get all DataNodes' NodeStatus
- *
- * @return Map<DataNodeId, NodeStatus>
- */
- public Map<Integer, NodeStatus> getDataNodeStatus() {
- Map<Integer, NodeStatus> nodeStatusMap = new TreeMap<>();
- nodeCacheMap.forEach(
- (nodeId, nodeCache) -> {
- if (nodeCache instanceof DataNodeHeartbeatCache) {
- nodeStatusMap.put(nodeId, nodeCache.getNodeStatus());
- }
- });
- return nodeStatusMap;
- }
-
/**
* Safely get the specified Node's current status with reason.
*
@@ -466,11 +555,6 @@ public class LoadCache {
.orElse(-1);
}
- /** Remove the specified Node's cache. */
- public void removeNodeCache(int nodeId) {
- nodeCacheMap.remove(nodeId);
- }
-
/**
* Safely get RegionStatus.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
index 90f313881ff..7fb18b0cdc8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager.load.cache.consensus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
/** ConsensusGroupHeartbeatSample records the heartbeat sample of a consensus
group. */
@@ -31,6 +32,12 @@ public class ConsensusGroupHeartbeatSample extends
AbstractHeartbeatSample {
this.leaderId = leaderId;
}
+ @TestOnly
+ public ConsensusGroupHeartbeatSample(int leaderId) {
+ super(System.nanoTime());
+ this.leaderId = leaderId;
+ }
+
public int getLeaderId() {
return leaderId;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
index bf55731abeb..733b806c851 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager.load.cache.consensus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics;
import java.util.Objects;
@@ -33,6 +34,12 @@ public class ConsensusGroupStatistics extends
AbstractStatistics {
this.leaderId = leaderId;
}
+ @TestOnly
+ public ConsensusGroupStatistics(int leaderId) {
+ super(System.nanoTime());
+ this.leaderId = leaderId;
+ }
+
public static ConsensusGroupStatistics
generateDefaultConsensusGroupStatistics() {
return new ConsensusGroupStatistics(0,
ConsensusGroupCache.UN_READY_LEADER_ID);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index 24bee71ad0e..12c554d51e8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -76,9 +76,7 @@ public class NodeStatistics extends AbstractStatistics {
return false;
}
NodeStatistics that = (NodeStatistics) o;
- return loadScore == that.loadScore
- && status == that.status
- && Objects.equals(statusReason, that.statusReason);
+ return status == that.status && Objects.equals(statusReason,
that.statusReason);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 2e68d370519..8facf40ba5a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -58,7 +58,8 @@ public class RegionCache extends AbstractLoadCache {
return (RegionStatistics) currentStatistics.get();
}
- public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample,
boolean overwrite) {
+ public synchronized void cacheHeartbeatSample(
+ RegionHeartbeatSample newHeartbeatSample, boolean overwrite) {
if (overwrite || getLastSample() == null) {
super.cacheHeartbeatSample(newHeartbeatSample);
return;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index ae922dc0028..d166d4bceae 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -35,14 +37,17 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RegionGroupCache {
+ private final String database;
// Map<DataNodeId(where a RegionReplica resides in), RegionCache>
private final Map<Integer, RegionCache> regionCacheMap;
// The current RegionGroupStatistics, used for providing statistics to other
services
private final AtomicReference<RegionGroupStatistics> currentStatistics;
/** Constructor for create RegionGroupCache with default
RegionGroupStatistics. */
- public RegionGroupCache() {
+ public RegionGroupCache(String database, Set<Integer> dataNodeIds) {
+ this.database = database;
this.regionCacheMap = new ConcurrentHashMap<>();
+ dataNodeIds.forEach(dataNodeId -> regionCacheMap.put(dataNodeId, new
RegionCache()));
this.currentStatistics =
new
AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
}
@@ -56,9 +61,9 @@ public class RegionGroupCache {
*/
public void cacheHeartbeatSample(
int dataNodeId, RegionHeartbeatSample newHeartbeatSample, boolean
overwrite) {
- regionCacheMap
- .computeIfAbsent(dataNodeId, empty -> new RegionCache())
- .cacheHeartbeatSample(newHeartbeatSample, overwrite);
+ // Only cache sample when the corresponding loadCache exists
+ Optional.ofNullable(regionCacheMap.get(dataNodeId))
+ .ifPresent(region -> region.cacheHeartbeatSample(newHeartbeatSample,
overwrite));
}
@TestOnly
@@ -66,6 +71,15 @@ public class RegionGroupCache {
cacheHeartbeatSample(dataNodeId, newHeartbeatSample, false);
}
+ /**
+ * Create the cache of the specified Region.
+ *
+ * @param dataNodeId the specified DataNode
+ */
+ public void createRegionCache(int dataNodeId) {
+ regionCacheMap.put(dataNodeId, new RegionCache());
+ }
+
/**
* Remove the cache of the specified Region in the specified RegionGroup.
*
@@ -131,4 +145,12 @@ public class RegionGroupCache {
public RegionGroupStatistics getCurrentStatistics() {
return currentStatistics.get();
}
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public Set<Integer> getRegionLocations() {
+ return regionCacheMap.keySet();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
index 34b4d9f0b0d..fdce5e71def 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
/** RegionHeartbeatSample records the heartbeat sample of a Region. */
@@ -32,6 +33,12 @@ public class RegionHeartbeatSample extends
AbstractHeartbeatSample {
this.status = status;
}
+ @TestOnly
+ public RegionHeartbeatSample(RegionStatus status) {
+ super(System.nanoTime());
+ this.status = status;
+ }
+
public RegionStatus getStatus() {
return status;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
index ff9ca15d1e8..0d885f90524 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
@@ -40,6 +41,7 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -126,16 +128,22 @@ public class EventService {
Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap = new TreeMap<>();
currentNodeStatisticsMap.forEach(
(nodeId, currentNodeStatistics) -> {
- NodeStatistics previousNodeStatistics =
- previousNodeStatisticsMap.getOrDefault(
- nodeId, NodeStatistics.generateDefaultNodeStatistics());
- if (currentNodeStatistics.isNewerThan(previousNodeStatistics)
- && !currentNodeStatistics.equals(previousNodeStatistics)) {
+ NodeStatistics previousNodeStatistics =
previousNodeStatisticsMap.get(nodeId);
+ if (previousNodeStatistics == null
+ || (currentNodeStatistics.isNewerThan(previousNodeStatistics)
+ && !currentNodeStatistics.equals(previousNodeStatistics))) {
differentNodeStatisticsMap.put(
nodeId, new Pair<>(previousNodeStatistics,
currentNodeStatistics));
previousNodeStatisticsMap.put(nodeId, currentNodeStatistics);
}
});
+ previousNodeStatisticsMap.forEach(
+ (nodeId, previousNodeStatistics) -> {
+ if (!currentNodeStatisticsMap.containsKey(nodeId)) {
+ differentNodeStatisticsMap.put(nodeId, new
Pair<>(previousNodeStatistics, null));
+ }
+ });
+
previousNodeStatisticsMap.keySet().retainAll(currentNodeStatisticsMap.keySet());
if (!differentNodeStatisticsMap.isEmpty()) {
eventPublisher.post(new
NodeStatisticsChangeEvent(differentNodeStatisticsMap));
recordNodeStatistics(differentNodeStatisticsMap);
@@ -147,14 +155,11 @@ public class EventService {
LOGGER.info("[NodeStatistics] NodeStatisticsMap: ");
for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>>
nodeCacheEntry :
differentNodeStatisticsMap.entrySet()) {
- if (!Objects.equals(
- nodeCacheEntry.getValue().getRight(),
nodeCacheEntry.getValue().getLeft())) {
- LOGGER.info(
- "[NodeStatistics]\t {}: {}->{}",
- "nodeId{" + nodeCacheEntry.getKey() + "}",
- nodeCacheEntry.getValue().getLeft(),
- nodeCacheEntry.getValue().getRight());
- }
+ LOGGER.info(
+ "[NodeStatistics]\t {}: {} -> {}",
+ "nodeId{" + nodeCacheEntry.getKey() + "}",
+ nodeCacheEntry.getValue().getLeft(),
+ nodeCacheEntry.getValue().getRight());
}
}
@@ -166,16 +171,24 @@ public class EventService {
currentRegionGroupStatisticsMap.forEach(
(regionGroupId, currentRegionGroupStatistics) -> {
RegionGroupStatistics previousRegionGroupStatistics =
- previousRegionGroupStatisticsMap.getOrDefault(
- regionGroupId,
RegionGroupStatistics.generateDefaultRegionGroupStatistics());
- if
(currentRegionGroupStatistics.isNewerThan(previousRegionGroupStatistics)
- &&
!currentRegionGroupStatistics.equals(previousRegionGroupStatistics)) {
+ previousRegionGroupStatisticsMap.get(regionGroupId);
+ if (previousRegionGroupStatistics == null
+ ||
(currentRegionGroupStatistics.isNewerThan(previousRegionGroupStatistics)
+ &&
!currentRegionGroupStatistics.equals(previousRegionGroupStatistics))) {
differentRegionGroupStatisticsMap.put(
regionGroupId,
new Pair<>(previousRegionGroupStatistics,
currentRegionGroupStatistics));
previousRegionGroupStatisticsMap.put(regionGroupId,
currentRegionGroupStatistics);
}
});
+ previousRegionGroupStatisticsMap.forEach(
+ (regionGroupId, previousRegionGroupStatistics) -> {
+ if (!currentRegionGroupStatisticsMap.containsKey(regionGroupId)) {
+ differentRegionGroupStatisticsMap.put(
+ regionGroupId, new Pair<>(previousRegionGroupStatistics,
null));
+ }
+ });
+
previousRegionGroupStatisticsMap.keySet().retainAll(currentRegionGroupStatisticsMap.keySet());
if (!differentRegionGroupStatisticsMap.isEmpty()) {
eventPublisher.post(new
RegionGroupStatisticsChangeEvent(differentRegionGroupStatisticsMap));
recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
@@ -188,38 +201,38 @@ public class EventService {
LOGGER.info("[RegionGroupStatistics] RegionGroupStatisticsMap: ");
for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
regionGroupStatisticsEntry :
differentRegionGroupStatisticsMap.entrySet()) {
- if (!Objects.equals(
- regionGroupStatisticsEntry.getValue().getRight(),
- regionGroupStatisticsEntry.getValue().getLeft())) {
- LOGGER.info(
- "[RegionGroupStatistics]\t RegionGroup {}: {} -> {}",
- regionGroupStatisticsEntry.getKey(),
-
regionGroupStatisticsEntry.getValue().getLeft().getRegionGroupStatus(),
-
regionGroupStatisticsEntry.getValue().getRight().getRegionGroupStatus());
+ RegionGroupStatistics previousStatistics =
regionGroupStatisticsEntry.getValue().getLeft();
+ RegionGroupStatistics currentStatistics =
regionGroupStatisticsEntry.getValue().getRight();
+ LOGGER.info(
+ "[RegionGroupStatistics]\t RegionGroup {}: {} -> {}",
+ regionGroupStatisticsEntry.getKey(),
+ previousStatistics == null ? null :
previousStatistics.getRegionGroupStatus(),
+ currentStatistics == null ? null :
currentStatistics.getRegionGroupStatus());
- List<Integer> leftIds =
regionGroupStatisticsEntry.getValue().getLeft().getRegionIds();
- List<Integer> rightIds =
regionGroupStatisticsEntry.getValue().getRight().getRegionIds();
- for (Integer leftId : leftIds) {
- if (!rightIds.contains(leftId)) {
- LOGGER.info(
- "[RegionGroupStatistics]\t Region in DataNode {}: {} -> null",
- leftId,
-
regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId));
- } else {
- LOGGER.info(
- "[RegionGroupStatistics]\t Region in DataNode {}: {} -> {}",
- leftId,
-
regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId),
-
regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(leftId));
- }
+ List<Integer> leftIds =
+ previousStatistics == null ? Collections.emptyList() :
previousStatistics.getRegionIds();
+ List<Integer> rightIds =
+ currentStatistics == null ? Collections.emptyList() :
currentStatistics.getRegionIds();
+ for (Integer leftId : leftIds) {
+ if (rightIds.contains(leftId)) {
+ LOGGER.info(
+ "[RegionGroupStatistics]\t Region in DataNode {}: {} -> {}",
+ leftId,
+ previousStatistics.getRegionStatus(leftId),
+ currentStatistics.getRegionStatus(leftId));
+ } else {
+ LOGGER.info(
+ "[RegionGroupStatistics]\t Region in DataNode {}: {} -> null",
+ leftId,
+ previousStatistics.getRegionStatus(leftId));
}
- for (Integer rightId : rightIds) {
- if (!leftIds.contains(rightId)) {
- LOGGER.info(
- "[RegionGroupStatistics]\t Region in DataNode {}: null -> {}",
- rightId,
-
regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(rightId));
- }
+ }
+ for (Integer rightId : rightIds) {
+ if (!leftIds.contains(rightId)) {
+ LOGGER.info(
+ "[RegionGroupStatistics]\t Region in DataNode {}: null -> {}",
+ rightId,
+ currentStatistics.getRegionStatus(rightId));
}
}
}
@@ -233,11 +246,10 @@ public class EventService {
currentConsensusGroupStatisticsMap.forEach(
(consensusGroupId, currentConsensusGroupStatistics) -> {
ConsensusGroupStatistics previousConsensusGroupStatistics =
- previousConsensusGroupStatisticsMap.getOrDefault(
- consensusGroupId,
-
ConsensusGroupStatistics.generateDefaultConsensusGroupStatistics());
- if
(currentConsensusGroupStatistics.isNewerThan(previousConsensusGroupStatistics)
- &&
!currentConsensusGroupStatistics.equals(previousConsensusGroupStatistics)) {
+ previousConsensusGroupStatisticsMap.get(consensusGroupId);
+ if (previousConsensusGroupStatistics == null
+ ||
(currentConsensusGroupStatistics.isNewerThan(previousConsensusGroupStatistics)
+ &&
!currentConsensusGroupStatistics.equals(previousConsensusGroupStatistics))) {
differentConsensusGroupStatisticsMap.put(
consensusGroupId,
new Pair<>(previousConsensusGroupStatistics,
currentConsensusGroupStatistics));
@@ -245,6 +257,16 @@ public class EventService {
consensusGroupId, currentConsensusGroupStatistics);
}
});
+ previousConsensusGroupStatisticsMap.forEach(
+ (consensusGroupId, previousConsensusGroupStatistics) -> {
+ if
(!currentConsensusGroupStatisticsMap.containsKey(consensusGroupId)) {
+ differentConsensusGroupStatisticsMap.put(
+ consensusGroupId, new Pair<>(previousConsensusGroupStatistics,
null));
+ }
+ });
+ previousConsensusGroupStatisticsMap
+ .keySet()
+ .retainAll(currentConsensusGroupStatisticsMap.keySet());
if (!differentConsensusGroupStatisticsMap.isEmpty()) {
eventPublisher.post(
new
ConsensusGroupStatisticsChangeEvent(differentConsensusGroupStatisticsMap));
@@ -262,11 +284,16 @@ public class EventService {
consensusGroupStatisticsEntry.getValue().getRight(),
consensusGroupStatisticsEntry.getValue().getLeft())) {
LOGGER.info(
- "[ConsensusGroupStatistics]\t {}: {}->{}",
+ "[ConsensusGroupStatistics]\t {}: {} -> {}",
consensusGroupStatisticsEntry.getKey(),
consensusGroupStatisticsEntry.getValue().getLeft(),
consensusGroupStatisticsEntry.getValue().getRight());
}
}
}
+
+ @TestOnly
+ public EventBus getEventPublisher() {
+ return eventPublisher;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 5afd20c2965..cedc2f86c9e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -271,7 +272,11 @@ public class NodeManager {
return resp;
}
+ // Create a new DataNodeHeartbeatCache and force update NodeStatus
int dataNodeId = nodeInfo.generateNextNodeId();
+
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode,
dataNodeId);
+ // TODO: invoke a force heartbeat to update new DataNode's status
immediately
+
RegisterDataNodePlan registerDataNodePlan =
new RegisterDataNodePlan(req.getDataNodeConfiguration());
// Register new DataNode
@@ -298,8 +303,6 @@ public class NodeManager {
// Adjust the maximum RegionGroup number of each Database
getClusterSchemaManager().adjustMaxRegionGroupNum();
- // TODO: Add a force heartbeat to update LoadCache immediately
-
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
resp.setDataNodeId(
registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 0509b6f0e37..0f5b7f87621 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -366,6 +366,16 @@ public class ConfigNodeProcedureEnv {
ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
}
+ /**
+ * Create a new ConfigNodeHeartbeatCache
+ *
+ * @param nodeId the index of the new ConfigNode
+ */
+ public void createConfigNodeHeartbeatCache(int nodeId) {
+
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.ConfigNode,
nodeId);
+ // TODO: invoke a force heartbeat to update new ConfigNode's status
immediately
+ }
+
/**
* Mark the given datanode as removing status to avoid read or write request
routing to this node.
*
@@ -554,13 +564,33 @@ public class ConfigNodeProcedureEnv {
* Force activating RegionGroup by setting status to Running, therefore the
ConfigNode-leader can
* select leader for it and use it to allocate new Partitions
*
- * @param activateRegionGroupMap Map<RegionGroupId, Map<DataNodeId, activate
heartbeat sample>>
+ * @param activateRegionGroupMap Map<Database, Map<RegionGroupId,
Map<DataNodeId, activate
+ * heartbeat sample>>>
*/
public void activateRegionGroup(
- Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>
activateRegionGroupMap) {
- getLoadManager().forceUpdateRegionGroupCache(activateRegionGroupMap);
+ Map<String, Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>>
+ activateRegionGroupMap) {
+ // Create RegionGroup heartbeat Caches
+ activateRegionGroupMap.forEach(
+ (database, regionGroupSampleMap) ->
+ regionGroupSampleMap.forEach(
+ (regionGroupId, regionSampleMap) ->
+ getLoadManager()
+ .getLoadCache()
+ .createRegionGroupHeartbeatCache(
+ database, regionGroupId,
regionSampleMap.keySet())));
+ // Force update first heartbeat samples
+ getLoadManager()
+ .forceUpdateRegionGroupCache(
+ activateRegionGroupMap.values().stream()
+ .flatMap(innerMap -> innerMap.entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue, (a, b) -> b)));
// Wait for leader and priority redistribution
- getLoadManager().waitForRegionGroupReady(new
ArrayList<>(activateRegionGroupMap.keySet()));
+ getLoadManager()
+ .waitForRegionGroupReady(
+ activateRegionGroupMap.values().stream()
+ .flatMap(innterMap -> innterMap.keySet().stream())
+ .collect(Collectors.toList()));
}
public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index e946c4dbc78..4dac3f311cb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -417,8 +417,7 @@ public class RegionMaintainHandler {
return report;
}
- public void addRegionLocation(
- TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus
regionStatus) {
+ public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation
newLocation) {
AddRegionLocationPlan req = new AddRegionLocationPlan(regionId,
newLocation);
TSStatus status =
configManager.getPartitionManager().addRegionLocation(req);
LOGGER.info(
@@ -428,14 +427,15 @@ public class RegionMaintainHandler {
status);
configManager
.getLoadManager()
- .forceAddRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
+ .getLoadCache()
+ .createRegionCache(regionId, newLocation.getDataNodeId());
}
- public void updateRegionCache(
+ public void forceUpdateRegionCache(
TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus
regionStatus) {
configManager
.getLoadManager()
- .forceAddRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
+ .forceUpdateRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
}
public void removeRegionLocation(
@@ -447,9 +447,7 @@ public class RegionMaintainHandler {
regionId,
getIdWithRpcEndpoint(deprecatedLocation),
status);
- configManager
- .getLoadManager()
- .forceRemoveRegionCache(regionId, deprecatedLocation.getDataNodeId());
+ configManager.getLoadManager().removeRegionCache(regionId,
deprecatedLocation.getDataNodeId());
configManager.getLoadManager().getRouteBalancer().balanceRegionLeaderAndPriority();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 160551b6847..b38696a10ed 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -81,8 +81,8 @@ public class AddConfigNodeProcedure extends
AbstractNodeProcedure<AddConfigNodeS
break;
case REGISTER_SUCCESS:
env.notifyRegisterSuccess(tConfigNodeLocation);
+
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
env.applyConfigNode(tConfigNodeLocation, versionInfo);
- // TODO: Add a force heartbeat to update LoadCache immediately
LOG.info("The ConfigNode: {} is successfully added to the cluster",
tConfigNodeLocation);
return Flow.NO_MORE_STATE;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index b8dd6075431..3450fd27221 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -91,7 +91,8 @@ public class AddRegionPeerProcedure
getProcId(),
consensusGroupId.getId(),
destDataNode.getDataNodeId());
- handler.addRegionLocation(consensusGroupId, destDataNode,
RegionStatus.Adding);
+ handler.addRegionLocation(consensusGroupId, destDataNode);
+ handler.forceUpdateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Adding);
TSStatus status = handler.createNewRegionPeer(consensusGroupId,
destDataNode);
setKillPoint(state);
if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
@@ -101,7 +102,7 @@ public class AddRegionPeerProcedure
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
break;
case DO_ADD_REGION_PEER:
- handler.updateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Adding);
+ handler.forceUpdateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Adding);
// We don't want to re-submit AddRegionPeerTask when leader change
or ConfigNode reboot
if (!this.isStateDeserialized()) {
TSStatus tsStatus =
@@ -141,7 +142,7 @@ public class AddRegionPeerProcedure
throw new UnsupportedOperationException(msg);
}
case UPDATE_REGION_LOCATION_CACHE:
- handler.updateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Running);
+ handler.forceUpdateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Running);
setKillPoint(state);
LOGGER.info("[pid{}][AddRegion] state {} complete", getProcId(),
state);
LOGGER.info(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 8498cf49c0a..95ec7bda351 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -178,8 +178,8 @@ public class CreateRegionGroupsProcedure
case ACTIVATE_REGION_GROUPS:
long currentTime = System.nanoTime();
// Build RegionGroupCache immediately to make these successfully built
RegionGroup available
- Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>
activateRegionGroupMap =
- new TreeMap<>();
+ Map<String, Map<TConsensusGroupId, Map<Integer,
RegionHeartbeatSample>>>
+ activateRegionGroupMap = new TreeMap<>();
createRegionGroupsPlan
.getRegionGroupMap()
.forEach(
@@ -211,8 +211,9 @@ public class CreateRegionGroupsProcedure
? RegionStatus.Unknown
: RegionStatus.Running));
});
- activateRegionGroupMap.put(
- regionReplicaSet.getRegionId(),
activateSampleMap);
+ activateRegionGroupMap
+ .computeIfAbsent(database, empty -> new
TreeMap<>())
+ .put(regionReplicaSet.getRegionId(),
activateSampleMap);
}
}));
env.activateRegionGroup(activateRegionGroupMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index 1bdf473c9eb..ed4de51ecf8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -89,13 +89,13 @@ public class RemoveRegionPeerProcedure
getProcId(),
consensusGroupId.getId(),
targetDataNode.getDataNodeId());
- handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
+ handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
handler.transferRegionLeader(consensusGroupId, targetDataNode);
KillPoint.setKillPoint(state);
setNextState(REMOVE_REGION_PEER);
break;
case REMOVE_REGION_PEER:
- handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
+ handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
tsStatus =
handler.submitRemoveRegionPeerTask(
this.getProcId(), targetDataNode, consensusGroupId,
coordinator);
@@ -121,7 +121,7 @@ public class RemoveRegionPeerProcedure
setNextState(DELETE_OLD_REGION_PEER);
break;
case DELETE_OLD_REGION_PEER:
- handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
+ handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
tsStatus =
handler.submitDeleteOldRegionPeerTask(
this.getProcId(), targetDataNode, consensusGroupId);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/FakeSubscriber.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/FakeSubscriber.java
new file mode 100644
index 00000000000..147f6aa7f9b
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/FakeSubscriber.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
+import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+import
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Semaphore;
+
+public class FakeSubscriber implements IClusterStatusSubscriber {
+
+ private final Semaphore nodeSemaphore;
+ private final Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap;
+ private final Semaphore regionGroupSemaphore;
+ private final Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ differentRegionGroupStatisticsMap;
+ private final Semaphore consensusGroupSemaphore;
+ private final Map<TConsensusGroupId, Pair<ConsensusGroupStatistics,
ConsensusGroupStatistics>>
+ differentConsensusGroupStatisticsMap;
+
+ public FakeSubscriber(
+ Semaphore nodeSemaphore, Semaphore regionGroupSemaphore, Semaphore
consensusGroupSemaphore) {
+ this.nodeSemaphore = nodeSemaphore;
+ this.regionGroupSemaphore = regionGroupSemaphore;
+ this.consensusGroupSemaphore = consensusGroupSemaphore;
+ differentNodeStatisticsMap = new TreeMap<>();
+ differentRegionGroupStatisticsMap = new TreeMap<>();
+ differentConsensusGroupStatisticsMap = new TreeMap<>();
+ }
+
+ @Override
+ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
+ differentNodeStatisticsMap.clear();
+ differentNodeStatisticsMap.putAll(event.getDifferentNodeStatisticsMap());
+ nodeSemaphore.release();
+ }
+
+ @Override
+ public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent
event) {
+ differentRegionGroupStatisticsMap.clear();
+
differentRegionGroupStatisticsMap.putAll(event.getDifferentRegionGroupStatisticsMap());
+ regionGroupSemaphore.release();
+ }
+
+ @Override
+ public void
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
+ differentConsensusGroupStatisticsMap.clear();
+
differentConsensusGroupStatisticsMap.putAll(event.getDifferentConsensusGroupStatisticsMap());
+ consensusGroupSemaphore.release();
+ }
+
+ public Map<Integer, Pair<NodeStatistics, NodeStatistics>>
getDifferentNodeStatisticsMap() {
+ return differentNodeStatisticsMap;
+ }
+
+ public Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ getDifferentRegionGroupStatisticsMap() {
+ return differentRegionGroupStatisticsMap;
+ }
+
+ public Map<TConsensusGroupId, Pair<ConsensusGroupStatistics,
ConsensusGroupStatistics>>
+ getDifferentConsensusGroupStatisticsMap() {
+ return differentConsensusGroupStatisticsMap;
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
new file mode 100644
index 00000000000..12b3dbb8233
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+
+import org.apache.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class LoadManagerTest {
+ private static LoadManager LOAD_MANAGER;
+ private static LoadCache LOAD_CACHE;
+ private static FakeSubscriber FAKE_SUBSCRIBER;
+ private static Semaphore NODE_SEMAPHORE;
+ private static Semaphore REGION_GROUP_SEMAPHORE;
+ private static Semaphore CONSENSUS_GROUP_SEMAPHORE;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ IManager CONFIG_MANAGER = new ConfigManager();
+ LOAD_MANAGER = CONFIG_MANAGER.getLoadManager();
+ LOAD_CACHE = LOAD_MANAGER.getLoadCache();
+ }
+
+ @Before
+ public void renewFakeSubscriber() {
+ NODE_SEMAPHORE = new Semaphore(0);
+ REGION_GROUP_SEMAPHORE = new Semaphore(0);
+ CONSENSUS_GROUP_SEMAPHORE = new Semaphore(0);
+ FAKE_SUBSCRIBER =
+ new FakeSubscriber(NODE_SEMAPHORE, REGION_GROUP_SEMAPHORE,
CONSENSUS_GROUP_SEMAPHORE);
+
LOAD_MANAGER.getEventService().getEventPublisher().register(FAKE_SUBSCRIBER);
+ }
+
+ @Test
+ public void testNodeCache() throws InterruptedException {
+ // Create 1 ConfigNode and 1 DataNode heartbeat cache
+ LOAD_CACHE.createNodeHeartbeatCache(NodeType.ConfigNode, 0);
+ LOAD_CACHE.createNodeHeartbeatCache(NodeType.DataNode, 1);
+
+ // Default NodeStatus is Unknown
+ Assert.assertEquals(NodeStatus.Unknown, LOAD_CACHE.getNodeStatus(0));
+ Assert.assertEquals(NodeStatus.Unknown, LOAD_CACHE.getNodeStatus(1));
+
+ // Simulate update to Running status
+ LOAD_CACHE.cacheConfigNodeHeartbeatSample(0, new
NodeHeartbeatSample(NodeStatus.Running));
+ LOAD_CACHE.cacheDataNodeHeartbeatSample(1, new
NodeHeartbeatSample(NodeStatus.Running));
+ LOAD_CACHE.updateNodeStatistics();
+
LOAD_MANAGER.getEventService().checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
+ NODE_SEMAPHORE.acquire();
+ Assert.assertEquals(NodeStatus.Running, LOAD_CACHE.getNodeStatus(0));
+ Assert.assertEquals(NodeStatus.Running, LOAD_CACHE.getNodeStatus(1));
+ Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap =
+ FAKE_SUBSCRIBER.getDifferentNodeStatisticsMap();
+ Assert.assertEquals(
+ new Pair<>(null, new NodeStatistics(NodeStatus.Running)),
+ differentNodeStatisticsMap.get(0));
+ Assert.assertEquals(
+ new Pair<>(null, new NodeStatistics(NodeStatus.Running)),
+ differentNodeStatisticsMap.get(1));
+
+ // Force update to Removing status
+ LOAD_MANAGER.forceUpdateNodeCache(
+ NodeType.DataNode, 1, new NodeHeartbeatSample(NodeStatus.Removing));
+ NODE_SEMAPHORE.acquire();
+ Assert.assertEquals(NodeStatus.Removing, LOAD_CACHE.getNodeStatus(1));
+ differentNodeStatisticsMap =
FAKE_SUBSCRIBER.getDifferentNodeStatisticsMap();
+ // Only DataNode 1 is updated
+ Assert.assertEquals(1, differentNodeStatisticsMap.size());
+ Assert.assertEquals(
+ new Pair<>(new NodeStatistics(NodeStatus.Running), new
NodeStatistics(NodeStatus.Removing)),
+ differentNodeStatisticsMap.get(1));
+
+ // Removing status can't be updated to any other status automatically
+ LOAD_CACHE.cacheDataNodeHeartbeatSample(1, new
NodeHeartbeatSample(NodeStatus.ReadOnly));
+ LOAD_CACHE.updateNodeStatistics();
+
LOAD_MANAGER.getEventService().checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
+ Assert.assertEquals(NodeStatus.Removing, LOAD_CACHE.getNodeStatus(1));
+
+ // Remove NodeCache
+ LOAD_MANAGER.removeNodeCache(1);
+ NODE_SEMAPHORE.acquire();
+ differentNodeStatisticsMap =
FAKE_SUBSCRIBER.getDifferentNodeStatisticsMap();
+ // Only DataNode 1 is updated
+ Assert.assertEquals(1, differentNodeStatisticsMap.size());
+ Assert.assertEquals(
+ new Pair<>(new NodeStatistics(NodeStatus.Removing), null),
+ differentNodeStatisticsMap.get(1));
+ }
+
+ @Test
+ public void testRegionGroupCache() throws InterruptedException {
+ // Create 1 RegionGroup heartbeat cache
+ TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 0);
+ Set<Integer> dataNodeIds = Stream.of(0, 1, 2).collect(Collectors.toSet());
+ LOAD_CACHE.createRegionGroupHeartbeatCache("root.db", regionGroupId,
dataNodeIds);
+
+ // Default RegionGroupStatus is Disabled and RegionStatus are Unknown
+ Assert.assertEquals(RegionGroupStatus.Disabled,
LOAD_CACHE.getRegionGroupStatus(regionGroupId));
+ dataNodeIds.forEach(
+ dataNodeId ->
+ Assert.assertEquals(
+ RegionStatus.Unknown,
LOAD_CACHE.getRegionStatus(regionGroupId, dataNodeId)));
+
+ // Simulate update Regions to Running status
+ dataNodeIds.forEach(
+ dataNodeId ->
+ LOAD_CACHE.cacheRegionHeartbeatSample(
+ regionGroupId, dataNodeId, new
RegionHeartbeatSample(RegionStatus.Running), false));
+ LOAD_CACHE.updateRegionGroupStatistics();
+
LOAD_MANAGER.getEventService().checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
+ REGION_GROUP_SEMAPHORE.acquire();
+ Assert.assertEquals(RegionGroupStatus.Running,
LOAD_CACHE.getRegionGroupStatus(regionGroupId));
+ dataNodeIds.forEach(
+ dataNodeId ->
+ Assert.assertEquals(
+ RegionStatus.Running,
LOAD_CACHE.getRegionStatus(regionGroupId, dataNodeId)));
+ Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+ differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
+ Map<Integer, RegionStatistics> allRunningRegionStatisticsMap =
+ dataNodeIds.stream()
+ .collect(
+ Collectors.toMap(
+ dataNodeId -> dataNodeId,
+ dataNodeId -> new RegionStatistics(RegionStatus.Running)));
+ Assert.assertEquals(
+ new Pair<>(
+ null,
+ new RegionGroupStatistics(RegionGroupStatus.Running,
allRunningRegionStatisticsMap)),
+ differentRegionGroupStatisticsMap.get(regionGroupId));
+
+ // Simulate Region migration from [0, 1, 2] to [1, 2, 3]
+ int removeDataNodeId = 0;
+ LOAD_MANAGER.forceUpdateRegionCache(regionGroupId, removeDataNodeId,
RegionStatus.Removing);
+ REGION_GROUP_SEMAPHORE.acquire();
+ // Mark Region 0 as Removing
+ Assert.assertEquals(
+ RegionStatus.Removing, LOAD_CACHE.getRegionStatus(regionGroupId,
removeDataNodeId));
+ differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
+ Map<Integer, RegionStatistics> oneRemovingRegionStatisticsMap =
+ new TreeMap<>(allRunningRegionStatisticsMap);
+ oneRemovingRegionStatisticsMap.replace(
+ removeDataNodeId, new RegionStatistics(RegionStatus.Removing));
+ Assert.assertEquals(
+ new Pair<>(
+ new RegionGroupStatistics(RegionGroupStatus.Running,
allRunningRegionStatisticsMap),
+ new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap)),
+ differentRegionGroupStatisticsMap.get(regionGroupId));
+ // Add and mark Region 3 as Adding
+ int addDataNodeId = 3;
+ LOAD_CACHE.createRegionCache(regionGroupId, addDataNodeId);
+ LOAD_MANAGER.forceUpdateRegionCache(regionGroupId, addDataNodeId,
RegionStatus.Adding);
+ REGION_GROUP_SEMAPHORE.acquire();
+ Assert.assertEquals(
+ RegionStatus.Adding, LOAD_CACHE.getRegionStatus(regionGroupId,
addDataNodeId));
+ differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
+ Map<Integer, RegionStatistics> oneAddingRegionStatisticsMap =
+ new TreeMap<>(oneRemovingRegionStatisticsMap);
+ oneAddingRegionStatisticsMap.put(addDataNodeId, new
RegionStatistics(RegionStatus.Adding));
+ Assert.assertEquals(
+ new Pair<>(
+ new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap),
+ new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneAddingRegionStatisticsMap)),
+ differentRegionGroupStatisticsMap.get(regionGroupId));
+ // Both Region 0 and 3 can't be updated
+ LOAD_CACHE.cacheRegionHeartbeatSample(
+ regionGroupId, removeDataNodeId, new
RegionHeartbeatSample(RegionStatus.Unknown), false);
+ LOAD_CACHE.cacheRegionHeartbeatSample(
+ regionGroupId, addDataNodeId, new
RegionHeartbeatSample(RegionStatus.ReadOnly), false);
+ LOAD_CACHE.updateRegionGroupStatistics();
+
LOAD_MANAGER.getEventService().checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
+ Assert.assertEquals(
+ RegionStatus.Removing, LOAD_CACHE.getRegionStatus(regionGroupId,
removeDataNodeId));
+ Assert.assertEquals(
+ RegionStatus.Adding, LOAD_CACHE.getRegionStatus(regionGroupId,
addDataNodeId));
+ // Adding process completed
+ LOAD_MANAGER.forceUpdateRegionCache(regionGroupId, addDataNodeId,
RegionStatus.Running);
+ REGION_GROUP_SEMAPHORE.acquire();
+ Assert.assertEquals(
+ RegionStatus.Running, LOAD_CACHE.getRegionStatus(regionGroupId,
addDataNodeId));
+ differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
+ oneRemovingRegionStatisticsMap.put(addDataNodeId, new
RegionStatistics(RegionStatus.Running));
+ Assert.assertEquals(
+ new Pair<>(
+ new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneAddingRegionStatisticsMap),
+ new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap)),
+ differentRegionGroupStatisticsMap.get(regionGroupId));
+ // Removing process completed
+ LOAD_MANAGER.removeRegionCache(regionGroupId, removeDataNodeId);
+ REGION_GROUP_SEMAPHORE.acquire();
+ differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
+ allRunningRegionStatisticsMap.remove(removeDataNodeId);
+ allRunningRegionStatisticsMap.put(addDataNodeId, new
RegionStatistics(RegionStatus.Running));
+ Assert.assertEquals(
+ new Pair<>(
+ new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap),
+ new RegionGroupStatistics(RegionGroupStatus.Running,
allRunningRegionStatisticsMap)),
+ differentRegionGroupStatisticsMap.get(regionGroupId));
+ }
+
+ @Test
+ public void testConsensusGroupCache() throws InterruptedException {
+ // Create 1 Consensus heartbeat cache
+ TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
+ Set<Integer> dataNodeIds = Stream.of(0, 1, 2).collect(Collectors.toSet());
+ LOAD_CACHE.createRegionGroupHeartbeatCache("root.db", regionGroupId,
dataNodeIds);
+
+ // Default ConsensusGroupStatus is leaderId == -1
+ Assert.assertEquals(-1, (int)
LOAD_CACHE.getRegionLeaderMap().get(regionGroupId));
+
+ // Simulate select leaderId == 1
+ int originLeaderId = 1;
+ LOAD_CACHE.cacheConsensusSample(
+ regionGroupId, new ConsensusGroupHeartbeatSample(originLeaderId));
+ LOAD_CACHE.updateConsensusGroupStatistics();
+ LOAD_MANAGER
+ .getEventService()
+ .checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary();
+ CONSENSUS_GROUP_SEMAPHORE.acquire();
+ Map<TConsensusGroupId, Pair<ConsensusGroupStatistics,
ConsensusGroupStatistics>>
+ differentConsensusGroupStatisticsMap =
+ FAKE_SUBSCRIBER.getDifferentConsensusGroupStatisticsMap();
+ Assert.assertEquals(
+ new Pair<>(null, new ConsensusGroupStatistics(originLeaderId)),
+ differentConsensusGroupStatisticsMap.get(regionGroupId));
+
+ // Force update leader to 2
+ int newLeaderId = 2;
+ LOAD_MANAGER.forceUpdateConsensusGroupCache(
+ Collections.singletonMap(regionGroupId, new
ConsensusGroupHeartbeatSample(newLeaderId)));
+ CONSENSUS_GROUP_SEMAPHORE.acquire();
+ differentConsensusGroupStatisticsMap =
+ FAKE_SUBSCRIBER.getDifferentConsensusGroupStatisticsMap();
+ Assert.assertEquals(
+ new Pair<>(
+ new ConsensusGroupStatistics(originLeaderId),
+ new ConsensusGroupStatistics(newLeaderId)),
+ differentConsensusGroupStatisticsMap.get(regionGroupId));
+
+ // Remove ConsensusGroupCache
+ LOAD_MANAGER.removeRegionGroupRelatedCache(regionGroupId);
+ CONSENSUS_GROUP_SEMAPHORE.acquire();
+ differentConsensusGroupStatisticsMap =
+ FAKE_SUBSCRIBER.getDifferentConsensusGroupStatisticsMap();
+ // Only SchemaRegionGroup 1 is updated
+ Assert.assertEquals(1, differentConsensusGroupStatisticsMap.size());
+ Assert.assertEquals(
+ new Pair<>(new ConsensusGroupStatistics(newLeaderId), null),
+ differentConsensusGroupStatisticsMap.get(regionGroupId));
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
index 7cc1f50c28c..f2f51e044ff 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
@@ -39,9 +39,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class CFDLeaderBalancerTest {
@@ -101,10 +103,14 @@ public class CFDLeaderBalancerTest {
// Prepare input parameters
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
databaseRegionGroupMap.put(DATABASE, regionGroupIds);
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
- regionReplicaSetMap.put(regionReplicaSet.getRegionId(),
regionReplicaSet));
+ regionReplicaSetMap.put(
+ regionReplicaSet.getRegionId(),
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet())));
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
@@ -147,8 +153,12 @@ public class CFDLeaderBalancerTest {
// Prepare input parameters
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
databaseRegionGroupMap.put(DATABASE,
Collections.singletonList(regionReplicaSet.getRegionId()));
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
- regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
+ regionReplicaSetMap.put(
+ regionReplicaSet.getRegionId(),
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>();
@@ -190,7 +200,7 @@ public class CFDLeaderBalancerTest {
nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
// Prepare RegionGroups
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
for (int i = 0; i < 5; i++) {
@@ -204,7 +214,11 @@ public class CFDLeaderBalancerTest {
Arrays.asList(
new TDataNodeLocation().setDataNodeId(0),
new TDataNodeLocation().setDataNodeId(1)));
- regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSetMap.put(
+ regionGroupId,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
regionLeaderMap.put(regionGroupId, 0);
// Assuming all Regions are migrating from DataNode-1 to DataNode-2
Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
@@ -255,7 +269,7 @@ public class CFDLeaderBalancerTest {
Random random = new Random();
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
databaseRegionGroupMap.put(DATABASE, new ArrayList<>());
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
for (int i = 0; i < regionGroupNum; i++) {
@@ -273,7 +287,11 @@ public class CFDLeaderBalancerTest {
regionStatisticsMap.put(regionGroupId, regionStatistics);
databaseRegionGroupMap.get(DATABASE).add(regionGroupId);
- regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSetMap.put(
+ regionGroupId,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
regionLeaderMap.put(regionGroupId, leaderId);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
index 6b97e1b385b..b531896d37c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -35,9 +35,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class GreedyLeaderBalancerTest {
@@ -45,7 +47,7 @@ public class GreedyLeaderBalancerTest {
@Test
public void optimalLeaderDistributionTest() {
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
@@ -67,7 +69,11 @@ public class GreedyLeaderBalancerTest {
regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSetMap.put(
+ regionGroupId,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
regionLeaderMap.put(regionGroupId, random.nextInt(3));
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
@@ -83,7 +89,11 @@ public class GreedyLeaderBalancerTest {
regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSetMap.put(
+ regionGroupId,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3));
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
@@ -110,7 +120,7 @@ public class GreedyLeaderBalancerTest {
@Test
public void disableTest() {
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
@@ -135,7 +145,11 @@ public class GreedyLeaderBalancerTest {
j, new RegionStatistics(j == 1 ? RegionStatus.Unknown :
RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSetMap.put(
+ regionGroupId,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
regionLeaderMap.put(regionGroupId, 1);
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
@@ -152,7 +166,11 @@ public class GreedyLeaderBalancerTest {
j, new RegionStatistics(j == 4 ? RegionStatus.ReadOnly :
RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSetMap.put(
+ regionGroupId,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
regionLeaderMap.put(regionGroupId, 4);
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
index fca962a2e56..885c97989da 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -46,6 +46,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class LeaderBalancerComparisonTest {
@@ -81,7 +82,7 @@ public class LeaderBalancerComparisonTest {
// Simulate each DataNode has 16 CPU cores
// and each RegionGroup has 3 replicas
int regionGroupNum = TEST_CPU_CORE_NUM * dataNodeNum / TEST_REPLICA_NUM;
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new
HashMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
generateTestData(dataNodeNum, regionGroupNum, regionReplicaSetMap,
regionLeaderMap);
@@ -101,13 +102,9 @@ public class LeaderBalancerComparisonTest {
regionReplicaSetMap.forEach(
(regionGroupId, regionReplicaSet) -> {
Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation ->
- regionStatistics.put(
- dataNodeLocation.getDataNodeId(),
- new RegionStatistics(RegionStatus.Running)));
+ regionReplicaSet.forEach(
+ dataNodeId ->
+ regionStatistics.put(dataNodeId, new
RegionStatistics(RegionStatus.Running)));
allRunningRegionStatistics.put(regionGroupId, regionStatistics);
});
Statistics greedyStatistics =
@@ -163,15 +160,13 @@ public class LeaderBalancerComparisonTest {
regionReplicaSetMap.forEach(
(regionGroupId, regionReplicaSet) -> {
Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation ->
- regionStatistics.put(
- dataNodeLocation.getDataNodeId(),
-
disabledDataNodeSet.contains(dataNodeLocation.getDataNodeId())
- ? new RegionStatistics(RegionStatus.Unknown)
- : new RegionStatistics(RegionStatus.Running)));
+ regionReplicaSet.forEach(
+ dataNodeId ->
+ regionStatistics.put(
+ dataNodeId,
+ disabledDataNodeSet.contains(dataNodeId)
+ ? new RegionStatistics(RegionStatus.Unknown)
+ : new RegionStatistics(RegionStatus.Running)));
disabledRegionStatistics.put(regionGroupId, regionStatistics);
});
greedyStatistics =
@@ -238,7 +233,7 @@ public class LeaderBalancerComparisonTest {
private void generateTestData(
int dataNodeNum,
int regionGroupNum,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap) {
Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
@@ -292,7 +287,11 @@ public class LeaderBalancerComparisonTest {
randomNum -= 1;
}
- regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+ regionReplicaSetMap.put(
+ regionGroupId,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()));
regionReplicaSet
.getDataNodeLocations()
.forEach(
@@ -307,7 +306,7 @@ public class LeaderBalancerComparisonTest {
int dataNodeNum,
int regionGroupNum,
AbstractLeaderBalancer leaderBalancer,
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+ Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> nodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap,
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
index a961eb310e7..e340c347e11 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
@@ -26,12 +26,18 @@ import
org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.junit.Assert;
import org.junit.Test;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
public class RegionGroupCacheTest {
+ private static final String DATABASE = "root.db";
+
@Test
public void getRegionStatusTest() {
long currentTime = System.nanoTime();
- RegionGroupCache regionGroupCache = new RegionGroupCache();
+ RegionGroupCache regionGroupCache =
+ new RegionGroupCache(DATABASE, Stream.of(0, 1, 2,
3).collect(Collectors.toSet()));
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.cacheHeartbeatSample(
@@ -55,7 +61,8 @@ public class RegionGroupCacheTest {
@Test
public void getRegionGroupStatusTest() {
long currentTime = System.nanoTime();
- RegionGroupCache runningRegionGroup = new RegionGroupCache();
+ RegionGroupCache runningRegionGroup =
+ new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
runningRegionGroup.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
runningRegionGroup.cacheHeartbeatSample(
@@ -67,7 +74,8 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Running,
runningRegionGroup.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache availableRegionGroup = new RegionGroupCache();
+ RegionGroupCache availableRegionGroup =
+ new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
availableRegionGroup.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
availableRegionGroup.cacheHeartbeatSample(
@@ -79,7 +87,8 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Available,
availableRegionGroup.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache disabledRegionGroup0 = new RegionGroupCache();
+ RegionGroupCache disabledRegionGroup0 =
+ new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
disabledRegionGroup0.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
disabledRegionGroup0.cacheHeartbeatSample(
@@ -91,7 +100,8 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Discouraged,
disabledRegionGroup0.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache disabledRegionGroup1 = new RegionGroupCache();
+ RegionGroupCache disabledRegionGroup1 =
+ new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
disabledRegionGroup1.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
disabledRegionGroup1.cacheHeartbeatSample(
@@ -103,7 +113,8 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Disabled,
disabledRegionGroup1.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache disabledRegionGroup2 = new RegionGroupCache();
+ RegionGroupCache disabledRegionGroup2 =
+ new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
disabledRegionGroup2.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
disabledRegionGroup2.cacheHeartbeatSample(