This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5923181a068 Revert "Normalize LoadCache interfaces (#12421)" (#12440)
5923181a068 is described below
commit 5923181a0688cfa4ee5905df80f3fd51b38109fa
Author: Potato <[email protected]>
AuthorDate: Mon Apr 29 00:03:10 2024 +0800
Revert "Normalize LoadCache interfaces (#12421)" (#12440)
This reverts commit 85f47e09d2e73cb1aeda9973728a247743ce4548.
---
.../iotdb/confignode/manager/load/LoadManager.java | 46 +---
.../manager/load/balancer/RouteBalancer.java | 4 +-
.../router/leader/AbstractLeaderBalancer.java | 18 +-
.../router/leader/GreedyLeaderBalancer.java | 14 +-
.../router/leader/MinCostFlowLeaderBalancer.java | 122 ++++-----
.../confignode/manager/load/cache/LoadCache.java | 178 ++++---------
.../consensus/ConsensusGroupHeartbeatSample.java | 7 -
.../cache/consensus/ConsensusGroupStatistics.java | 7 -
.../manager/load/cache/node/NodeStatistics.java | 4 +-
.../manager/load/cache/region/RegionCache.java | 3 +-
.../load/cache/region/RegionGroupCache.java | 30 +--
.../load/cache/region/RegionHeartbeatSample.java | 7 -
.../manager/load/service/EventService.java | 133 ++++------
.../iotdb/confignode/manager/node/NodeManager.java | 7 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 38 +--
.../procedure/env/RegionMaintainHandler.java | 14 +-
.../impl/node/AddConfigNodeProcedure.java | 2 +-
.../impl/region/AddRegionPeerProcedure.java | 7 +-
.../impl/region/CreateRegionGroupsProcedure.java | 9 +-
.../impl/region/RemoveRegionPeerProcedure.java | 6 +-
.../confignode/manager/load/FakeSubscriber.java | 92 -------
.../confignode/manager/load/LoadManagerTest.java | 295 ---------------------
.../router/leader/CFDLeaderBalancerTest.java | 34 +--
.../router/leader/GreedyLeaderBalancerTest.java | 30 +--
.../leader/LeaderBalancerComparisonTest.java | 39 +--
.../manager/load/cache/RegionGroupCacheTest.java | 23 +-
26 files changed, 258 insertions(+), 911 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 551a1f4ea08..b115aee68aa 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
@@ -266,16 +265,9 @@ public class LoadManager {
eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}
- /**
- * Remove the NodeHeartbeatCache of the specified Node, update statistics
and broadcast statistics
- * change event if necessary.
- *
- * @param nodeId the index of the specified Node
- */
+ /** Remove the specified Node's cache. */
public void removeNodeCache(int nodeId) {
loadCache.removeNodeCache(nodeId);
- loadCache.updateNodeStatistics();
- eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
}
/**
@@ -332,8 +324,7 @@ public class LoadManager {
}
/**
- * Force update the specified RegionGroups' cache, update statistics and
broadcast statistics
- * change event if necessary.
+ * Force update the specified RegionGroups' cache.
*
* @param heartbeatSampleMap Map<RegionGroupId, Map<DataNodeId,
RegionHeartbeatSample>>
*/
@@ -344,20 +335,18 @@ public class LoadManager {
regionHeartbeatSampleMap.forEach(
(dataNodeId, regionHeartbeatSample) ->
loadCache.cacheRegionHeartbeatSample(
- regionGroupId, dataNodeId, regionHeartbeatSample,
true)));
+ regionGroupId, dataNodeId, regionHeartbeatSample,
false)));
loadCache.updateRegionGroupStatistics();
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
}
/**
- * Force update the specified Region's cache, update statistics and
broadcast statistics change
- * event if necessary.
+ * Add the cache of the specified Region in the specified RegionGroup.
*
- * @param regionGroupId The specified RegionGroup
- * @param dataNodeId The DataNodeId where the specified Region is located
- * @param regionStatus The specified RegionStatus
+ * @param regionGroupId the specified RegionGroup
+ * @param dataNodeId the specified DataNode
*/
- public void forceUpdateRegionCache(
+ public void forceAddRegionCache(
TConsensusGroupId regionGroupId, int dataNodeId, RegionStatus
regionStatus) {
loadCache.cacheRegionHeartbeatSample(
regionGroupId,
@@ -369,31 +358,21 @@ public class LoadManager {
}
/**
- * Remove the cache of the specified Region in the specified RegionGroup,
update statistics and
- * broadcast statistics change event if necessary.
+ * Remove the cache of the specified Region in the specified RegionGroup.
*
* @param regionGroupId the specified RegionGroup
* @param dataNodeId the specified DataNode
*/
- public void removeRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
+ public void forceRemoveRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
loadCache.removeRegionCache(regionGroupId, dataNodeId);
loadCache.updateRegionGroupStatistics();
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
}
- /**
- * Remove the specified RegionGroup's related cache, update statistics and
broadcast statistics
- * change event if necessary.
- *
- * @param consensusGroupId The specified RegionGroup
- */
+ /** Remove the specified RegionGroup's cache. */
public void removeRegionGroupRelatedCache(TConsensusGroupId
consensusGroupId) {
loadCache.removeRegionGroupCache(consensusGroupId);
routeBalancer.removeRegionPriority(consensusGroupId);
- loadCache.updateRegionGroupStatistics();
- loadCache.updateConsensusGroupStatistics();
-
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
-
eventService.checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary();
}
/**
@@ -462,9 +441,4 @@ public class LoadManager {
public RouteBalancer getRouteBalancer() {
return routeBalancer;
}
-
- @TestOnly
- public EventService getEventService() {
- return eventService;
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index a24098c3b92..79ca434e1d3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -154,8 +154,8 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
Map<TConsensusGroupId, Integer> currentLeaderMap =
getLoadManager().getRegionLeaderMap();
Map<TConsensusGroupId, Integer> optimalLeaderMap =
leaderBalancer.generateOptimalLeaderDistribution(
-
getLoadManager().getLoadCache().getCurrentDatabaseRegionGroupMap(regionGroupType),
-
getLoadManager().getLoadCache().getCurrentRegionLocationMap(regionGroupType),
+ getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
+ getPartitionManager().getAllReplicaSetsMap(regionGroupType),
currentLeaderMap,
getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(),
getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
index 32eebf1586d..b82b3a0ca48 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
@@ -27,7 +28,6 @@ import
org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
public abstract class AbstractLeaderBalancer {
@@ -37,8 +37,8 @@ public abstract class AbstractLeaderBalancer {
// Map<Database, List<RegionGroup>>
protected final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
- // Map<RegionGroupId, Set<DataNodeId>>
- protected final Map<TConsensusGroupId, Set<Integer>> regionLocationMap;
+ // Map<RegionGroupId, RegionGroup>
+ protected final Map<TConsensusGroupId, TRegionReplicaSet>
regionReplicaSetMap;
// Map<RegionGroupId, leaderId>
protected final Map<TConsensusGroupId, Integer> regionLeaderMap;
// Map<DataNodeId, NodeStatistics>
@@ -48,7 +48,7 @@ public abstract class AbstractLeaderBalancer {
protected AbstractLeaderBalancer() {
this.databaseRegionGroupMap = new TreeMap<>();
- this.regionLocationMap = new TreeMap<>();
+ this.regionReplicaSetMap = new TreeMap<>();
this.regionLeaderMap = new TreeMap<>();
this.dataNodeStatisticsMap = new TreeMap<>();
this.regionStatisticsMap = new TreeMap<>();
@@ -56,12 +56,12 @@ public abstract class AbstractLeaderBalancer {
protected void initialize(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
- this.regionLocationMap.putAll(regionLocationMap);
+ this.regionReplicaSetMap.putAll(regionReplicaSetMap);
this.regionLeaderMap.putAll(regionLeaderMap);
this.dataNodeStatisticsMap.putAll(dataNodeStatisticsMap);
this.regionStatisticsMap.putAll(regionStatisticsMap);
@@ -83,7 +83,7 @@ public abstract class AbstractLeaderBalancer {
protected void clear() {
this.databaseRegionGroupMap.clear();
- this.regionLocationMap.clear();
+ this.regionReplicaSetMap.clear();
this.regionLeaderMap.clear();
this.dataNodeStatisticsMap.clear();
this.regionStatisticsMap.clear();
@@ -93,7 +93,7 @@ public abstract class AbstractLeaderBalancer {
* Generate an optimal leader distribution.
*
* @param databaseRegionGroupMap RegionGroup held by each Database
- * @param regionLocationMap All RegionGroups the cluster currently have
+ * @param regionReplicaSetMap All RegionGroups the cluster currently have
* @param regionLeaderMap The current leader distribution of each RegionGroup
* @param dataNodeStatisticsMap The current statistics of each DataNode
* @param regionStatisticsMap The current statistics of each Region
@@ -101,7 +101,7 @@ public abstract class AbstractLeaderBalancer {
*/
public abstract Map<TConsensusGroupId, Integer>
generateOptimalLeaderDistribution(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 101ddf35e71..5a6098b60f9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -20,12 +20,13 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,13 +40,13 @@ public class GreedyLeaderBalancer extends
AbstractLeaderBalancer {
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
initialize(
databaseRegionGroupMap,
- regionLocationMap,
+ regionReplicaSetMap,
regionLeaderMap,
dataNodeStatisticsMap,
regionStatisticsMap);
@@ -56,11 +57,12 @@ public class GreedyLeaderBalancer extends
AbstractLeaderBalancer {
private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
Map<Integer, Integer> leaderCounter = new TreeMap<>();
- regionLocationMap.forEach(
- (regionGroupId, dataNodeIds) -> {
+ regionReplicaSetMap.forEach(
+ (regionGroupId, regionGroup) -> {
int minCount = Integer.MAX_VALUE,
leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1);
- for (int dataNodeId : dataNodeIds) {
+ for (TDataNodeLocation dataNodeLocation :
regionGroup.getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
if (isDataNodeAvailable(dataNodeId) &&
isRegionAvailable(regionGroupId, dataNodeId)) {
// Select the DataNode with the minimal leader count as the new
leader
int count = leaderCounter.getOrDefault(dataNodeId, 0);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index d348d0e1458..daf1d15f27c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
@@ -29,9 +31,7 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Queue;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -82,13 +82,13 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
- Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
initialize(
databaseRegionGroupMap,
- regionLocationMap,
+ regionReplicaSetMap,
regionLeaderMap,
dataNodeStatisticsMap,
regionStatisticsMap);
@@ -129,22 +129,21 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue();
for (TConsensusGroupId regionGroupId : regionGroupIds) {
rNodeMap.put(regionGroupId, maxNode++);
- regionLocationMap
- .get(regionGroupId)
- .forEach(
- dataNodeId -> {
- if (isDataNodeAvailable(dataNodeId)) {
- if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
- sDNodeMap.get(database).put(dataNodeId, maxNode);
- sDNodeReflect.get(database).put(maxNode, dataNodeId);
- maxNode += 1;
- }
- if (!tDNodeMap.containsKey(dataNodeId)) {
- tDNodeMap.put(dataNodeId, maxNode);
- maxNode += 1;
- }
- }
- });
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (isDataNodeAvailable(dataNodeId)) {
+ if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
+ sDNodeMap.get(database).put(dataNodeId, maxNode);
+ sDNodeReflect.get(database).put(maxNode, dataNodeId);
+ maxNode += 1;
+ }
+ if (!tDNodeMap.containsKey(dataNodeId)) {
+ tDNodeMap.put(dataNodeId, maxNode);
+ maxNode += 1;
+ }
+ }
+ }
}
}
@@ -167,23 +166,17 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
String database = databaseEntry.getKey();
for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
int rNode = rNodeMap.get(regionGroupId);
- regionLocationMap
- .get(regionGroupId)
- .forEach(
- dataNodeId -> {
- if (isDataNodeAvailable(dataNodeId)
- && isRegionAvailable(regionGroupId, dataNodeId)) {
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- // Capacity: 1, Cost: 1 if sDNode is the current leader of
the rNode, 0
- // otherwise.
- // Therefore, the RegionGroup will keep the leader as
constant as possible.
- int cost =
-
Objects.equals(regionLeaderMap.getOrDefault(regionGroupId, -1), dataNodeId)
- ? 0
- : 1;
- addAdjacentEdges(rNode, sDNode, 1, cost);
- }
- });
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (isDataNodeAvailable(dataNodeId) &&
isRegionAvailable(regionGroupId, dataNodeId)) {
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
+ // Therefore, the RegionGroup will keep the leader as constant as
possible.
+ int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
+ addAdjacentEdges(rNode, sDNode, 1, cost);
+ }
+ }
}
}
@@ -194,21 +187,19 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
// Map<DataNodeId, leader number>
Map<Integer, Integer> leaderCounter = new TreeMap<>();
for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
- regionLocationMap
- .get(regionGroupId)
- .forEach(
- dataNodeId -> {
- if (isDataNodeAvailable(dataNodeId)) {
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = leaderCounter.merge(dataNodeId, 1,
Integer::sum);
- // Capacity: 1, Cost: x^2 for the x-th edge at the current
sDNode.
- // Thus, the leader distribution will be as balance as
possible within each
- // Database
- // based on the Jensen's-Inequality.
- addAdjacentEdges(sDNode, tDNode, 1, leaderCount *
leaderCount);
- }
- });
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (isDataNodeAvailable(dataNodeId)) {
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+ // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+ // Thus, the leader distribution will be as balance as possible
within each Database
+ // based on the Jensen's-Inequality.
+ addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
+ }
+ }
}
}
@@ -216,20 +207,19 @@ public class MinCostFlowLeaderBalancer extends
AbstractLeaderBalancer {
// Map<DataNodeId, possible maximum leader>
// Count the possible maximum number of leader in each DataNode
Map<Integer, Integer> maxLeaderCounter = new TreeMap<>();
- regionLocationMap.forEach(
- (regionGroupId, dataNodeIds) ->
- dataNodeIds.forEach(
- dataNodeId -> {
- if (isDataNodeAvailable(dataNodeId)) {
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = maxLeaderCounter.merge(dataNodeId, 1,
Integer::sum);
- // Cost: x^2 for the x-th edge at the current dNode.
- // Thus, the leader distribution will be as balance as
possible within the
- // cluster
- // Based on the Jensen's-Inequality.
- addAdjacentEdges(tDNode, T_NODE, 1, leaderCount *
leaderCount);
- }
- }));
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (isDataNodeAvailable(dataNodeId)) {
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = maxLeaderCounter.merge(dataNodeId, 1,
Integer::sum);
+ // Cost: x^2 for the x-th edge at the current dNode.
+ // Thus, the leader distribution will be as balance as possible
within the cluster
+ // Based on the Jensen's-Inequality.
+ addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
+ }
+ }
+ }
}
private void addAdjacentEdges(int fromNode, int destNode, int capacity, int
cost) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 78723d3873d..1ad90a2b02a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -23,11 +23,9 @@ import
org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
@@ -48,7 +46,6 @@ import
org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -98,12 +95,7 @@ public class LoadCache {
initNodeHeartbeatCache(
configManager.getNodeManager().getRegisteredConfigNodes(),
configManager.getNodeManager().getRegisteredDataNodes());
- initRegionGroupHeartbeatCache(
- configManager.getClusterSchemaManager().getDatabaseNames().stream()
- .collect(
- Collectors.toMap(
- database -> database,
- database ->
configManager.getPartitionManager().getAllReplicaSets(database))));
+
initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
}
/** Initialize the nodeCacheMap when the ConfigNode-Leader is switched. */
@@ -120,7 +112,8 @@ public class LoadCache {
configNodeLocation -> {
int configNodeId = configNodeLocation.getConfigNodeId();
if (configNodeId != CURRENT_NODE_ID) {
- createNodeHeartbeatCache(NodeType.ConfigNode, configNodeId);
+ nodeCacheMap.put(configNodeId, new
ConfigNodeHeartbeatCache(configNodeId));
+ heartbeatProcessingMap.put(configNodeId, new AtomicBoolean(false));
}
});
// Force set itself and never update
@@ -133,7 +126,8 @@ public class LoadCache {
registeredDataNodes.forEach(
dataNodeConfiguration -> {
int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
- createNodeHeartbeatCache(NodeType.DataNode, dataNodeId);
+ nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId));
+ heartbeatProcessingMap.put(dataNodeId, new AtomicBoolean(false));
});
}
@@ -141,24 +135,15 @@ public class LoadCache {
* Initialize the regionGroupCacheMap and regionRouteCacheMap when the
ConfigNode-Leader is
* switched.
*/
- private void initRegionGroupHeartbeatCache(
- Map<String, List<TRegionReplicaSet>> regionReplicaMap) {
+ private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet>
regionReplicaSets) {
regionGroupCacheMap.clear();
consensusGroupCacheMap.clear();
- regionReplicaMap.forEach(
- (database, regionReplicaSets) ->
- regionReplicaSets.forEach(
- regionReplicaSet -> {
- TConsensusGroupId regionGroupId =
regionReplicaSet.getRegionId();
- regionGroupCacheMap.put(
- regionGroupId,
- new RegionGroupCache(
- database,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet())));
- consensusGroupCacheMap.put(regionGroupId, new
ConsensusGroupCache());
- }));
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ TConsensusGroupId consensusGroupId = regionReplicaSet.getRegionId();
+ regionGroupCacheMap.put(consensusGroupId, new RegionGroupCache());
+ consensusGroupCacheMap.put(consensusGroupId, new
ConsensusGroupCache());
+ });
}
public void clearHeartbeatCache() {
@@ -180,25 +165,6 @@ public class LoadCache {
.getAndSet(true);
}
- /**
- * Create a new NodeHeartbeatCache for the specified Node.
- *
- * @param nodeType The specified NodeType
- * @param nodeId The specified NodeId
- */
- public void createNodeHeartbeatCache(NodeType nodeType, int nodeId) {
- switch (nodeType) {
- case ConfigNode:
- nodeCacheMap.put(nodeId, new ConfigNodeHeartbeatCache(nodeId));
- break;
- case DataNode:
- default:
- nodeCacheMap.put(nodeId, new DataNodeHeartbeatCache(nodeId));
- break;
- }
- heartbeatProcessingMap.put(nodeId, new AtomicBoolean(false));
- }
-
/**
* Cache the latest heartbeat sample of a ConfigNode.
*
@@ -206,10 +172,10 @@ public class LoadCache {
* @param sample the latest heartbeat sample
*/
public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample
sample) {
- // Only cache sample when the corresponding loadCache exists
- Optional.ofNullable(nodeCacheMap.get(nodeId))
- .ifPresent(node -> node.cacheHeartbeatSample(sample));
- Optional.ofNullable(heartbeatProcessingMap.get(nodeId)).ifPresent(node ->
node.set(false));
+ nodeCacheMap
+ .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
+ .cacheHeartbeatSample(sample);
+ heartbeatProcessingMap.get(nodeId).set(false);
}
/**
@@ -219,49 +185,16 @@ public class LoadCache {
* @param sample the latest heartbeat sample
*/
public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample
sample) {
- // Only cache sample when the corresponding loadCache exists
- Optional.ofNullable(nodeCacheMap.get(nodeId))
- .ifPresent(node -> node.cacheHeartbeatSample(sample));
- Optional.ofNullable(heartbeatProcessingMap.get(nodeId)).ifPresent(node ->
node.set(false));
+ nodeCacheMap
+ .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
+ .cacheHeartbeatSample(sample);
+ heartbeatProcessingMap.get(nodeId).set(false);
}
public void resetHeartbeatProcessing(int nodeId) {
heartbeatProcessingMap.get(nodeId).set(false);
}
- /**
- * Remove the NodeHeartbeatCache of the specified Node.
- *
- * @param nodeId the index of the specified Node
- */
- public void removeNodeCache(int nodeId) {
- nodeCacheMap.remove(nodeId);
- heartbeatProcessingMap.remove(nodeId);
- }
-
- /**
- * Create a new RegionGroupCache and a new ConsensusGroupCache for the
specified RegionGroup.
- *
- * @param database the Database where the RegionGroup belonged
- * @param regionGroupId the index of the RegionGroup
- * @param dataNodeIds the index of the DataNodes where the Regions resided
- */
- public void createRegionGroupHeartbeatCache(
- String database, TConsensusGroupId regionGroupId, Set<Integer>
dataNodeIds) {
- regionGroupCacheMap.put(regionGroupId, new RegionGroupCache(database,
dataNodeIds));
- consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache());
- }
-
- /**
- * Create a new RegionCache for the specified Region in the specified
RegionGroup.
- *
- * @param regionGroupId the index of the RegionGroup
- * @param dataNodeId the index of the DataNode where the Region resides
- */
- public void createRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
- regionGroupCacheMap.get(regionGroupId).createRegionCache(dataNodeId);
- }
-
/**
* Cache the latest heartbeat sample of a RegionGroup.
*
@@ -274,9 +207,9 @@ public class LoadCache {
int nodeId,
RegionHeartbeatSample sample,
boolean overwrite) {
- // Only cache sample when the corresponding loadCache exists
- Optional.ofNullable(regionGroupCacheMap.get(regionGroupId))
- .ifPresent(group -> group.cacheHeartbeatSample(nodeId, sample,
overwrite));
+ regionGroupCacheMap
+ .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache())
+ .cacheHeartbeatSample(nodeId, sample, overwrite);
}
/**
@@ -298,9 +231,9 @@ public class LoadCache {
*/
public void cacheConsensusSample(
TConsensusGroupId regionGroupId, ConsensusGroupHeartbeatSample sample) {
- // Only cache sample when the corresponding loadCache exists
- Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId))
- .ifPresent(group -> group.cacheHeartbeatSample(sample));
+ consensusGroupCacheMap
+ .computeIfAbsent(regionGroupId, empty -> new ConsensusGroupCache())
+ .cacheHeartbeatSample(sample);
}
/** Update the NodeStatistics of all Nodes. */
@@ -347,44 +280,6 @@ public class LoadCache {
return dataNodeStatisticsMap;
}
- /**
- * Get a map of cached RegionGroups of all Databases.
- *
- * @param type SchemaRegion or DataRegion
- * @return Map<Database, List<RegionGroupId>>
- */
- public Map<String, List<TConsensusGroupId>> getCurrentDatabaseRegionGroupMap(
- TConsensusGroupType type) {
- Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
- regionGroupCacheMap.forEach(
- (regionGroupId, regionGroupCache) -> {
- if (type.equals(regionGroupId.getType())) {
- databaseRegionGroupMap
- .computeIfAbsent(regionGroupCache.getDatabase(), empty -> new
ArrayList<>())
- .add(regionGroupId);
- }
- });
- return databaseRegionGroupMap;
- }
-
- /**
- * Get a map of cached RegionGroups
- *
- * @param type SchemaRegion or DataRegion
- * @return Map<RegionGroupId, Set<DataNodeId>>
- */
- public Map<TConsensusGroupId, Set<Integer>> getCurrentRegionLocationMap(
- TConsensusGroupType type) {
- Map<TConsensusGroupId, Set<Integer>> regionGroupIdsMap = new TreeMap<>();
- regionGroupCacheMap.forEach(
- (regionGroupId, regionGroupCache) -> {
- if (type.equals(regionGroupId.getType())) {
- regionGroupIdsMap.put(regionGroupId,
regionGroupCache.getRegionLocations());
- }
- });
- return regionGroupIdsMap;
- }
-
/**
* Get the RegionGroupStatistics of all RegionGroups.
*
@@ -442,6 +337,22 @@ public class LoadCache {
return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
}
+ /**
+ * Get all DataNodes' NodeStatus
+ *
+ * @return Map<DataNodeId, NodeStatus>
+ */
+ public Map<Integer, NodeStatus> getDataNodeStatus() {
+ Map<Integer, NodeStatus> nodeStatusMap = new TreeMap<>();
+ nodeCacheMap.forEach(
+ (nodeId, nodeCache) -> {
+ if (nodeCache instanceof DataNodeHeartbeatCache) {
+ nodeStatusMap.put(nodeId, nodeCache.getNodeStatus());
+ }
+ });
+ return nodeStatusMap;
+ }
+
/**
* Safely get the specified Node's current status with reason.
*
@@ -555,6 +466,11 @@ public class LoadCache {
.orElse(-1);
}
+ /** Remove the specified Node's cache. */
+ public void removeNodeCache(int nodeId) {
+ nodeCacheMap.remove(nodeId);
+ }
+
/**
* Safely get RegionStatus.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
index 7fb18b0cdc8..90f313881ff 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupHeartbeatSample.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager.load.cache.consensus;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
/** ConsensusGroupHeartbeatSample records the heartbeat sample of a consensus
group. */
@@ -32,12 +31,6 @@ public class ConsensusGroupHeartbeatSample extends
AbstractHeartbeatSample {
this.leaderId = leaderId;
}
- @TestOnly
- public ConsensusGroupHeartbeatSample(int leaderId) {
- super(System.nanoTime());
- this.leaderId = leaderId;
- }
-
public int getLeaderId() {
return leaderId;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
index 733b806c851..bf55731abeb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupStatistics.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager.load.cache.consensus;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics;
import java.util.Objects;
@@ -34,12 +33,6 @@ public class ConsensusGroupStatistics extends
AbstractStatistics {
this.leaderId = leaderId;
}
- @TestOnly
- public ConsensusGroupStatistics(int leaderId) {
- super(System.nanoTime());
- this.leaderId = leaderId;
- }
-
public static ConsensusGroupStatistics
generateDefaultConsensusGroupStatistics() {
return new ConsensusGroupStatistics(0,
ConsensusGroupCache.UN_READY_LEADER_ID);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index 12c554d51e8..24bee71ad0e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -76,7 +76,9 @@ public class NodeStatistics extends AbstractStatistics {
return false;
}
NodeStatistics that = (NodeStatistics) o;
- return status == that.status && Objects.equals(statusReason,
that.statusReason);
+ return loadScore == that.loadScore
+ && status == that.status
+ && Objects.equals(statusReason, that.statusReason);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 8facf40ba5a..2e68d370519 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -58,8 +58,7 @@ public class RegionCache extends AbstractLoadCache {
return (RegionStatistics) currentStatistics.get();
}
- public synchronized void cacheHeartbeatSample(
- RegionHeartbeatSample newHeartbeatSample, boolean overwrite) {
+ public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample,
boolean overwrite) {
if (overwrite || getLastSample() == null) {
super.cacheHeartbeatSample(newHeartbeatSample);
return;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index d166d4bceae..ae922dc0028 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -37,17 +35,14 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RegionGroupCache {
- private final String database;
// Map<DataNodeId(where a RegionReplica resides in), RegionCache>
private final Map<Integer, RegionCache> regionCacheMap;
// The current RegionGroupStatistics, used for providing statistics to other
services
private final AtomicReference<RegionGroupStatistics> currentStatistics;
/** Constructor for create RegionGroupCache with default
RegionGroupStatistics. */
- public RegionGroupCache(String database, Set<Integer> dataNodeIds) {
- this.database = database;
+ public RegionGroupCache() {
this.regionCacheMap = new ConcurrentHashMap<>();
- dataNodeIds.forEach(dataNodeId -> regionCacheMap.put(dataNodeId, new
RegionCache()));
this.currentStatistics =
new
AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
}
@@ -61,9 +56,9 @@ public class RegionGroupCache {
*/
public void cacheHeartbeatSample(
int dataNodeId, RegionHeartbeatSample newHeartbeatSample, boolean
overwrite) {
- // Only cache sample when the corresponding loadCache exists
- Optional.ofNullable(regionCacheMap.get(dataNodeId))
- .ifPresent(region -> region.cacheHeartbeatSample(newHeartbeatSample,
overwrite));
+ regionCacheMap
+ .computeIfAbsent(dataNodeId, empty -> new RegionCache())
+ .cacheHeartbeatSample(newHeartbeatSample, overwrite);
}
@TestOnly
@@ -71,15 +66,6 @@ public class RegionGroupCache {
cacheHeartbeatSample(dataNodeId, newHeartbeatSample, false);
}
- /**
- * Create the cache of the specified Region.
- *
- * @param dataNodeId the specified DataNode
- */
- public void createRegionCache(int dataNodeId) {
- regionCacheMap.put(dataNodeId, new RegionCache());
- }
-
/**
* Remove the cache of the specified Region in the specified RegionGroup.
*
@@ -145,12 +131,4 @@ public class RegionGroupCache {
public RegionGroupStatistics getCurrentStatistics() {
return currentStatistics.get();
}
-
- public String getDatabase() {
- return database;
- }
-
- public Set<Integer> getRegionLocations() {
- return regionCacheMap.keySet();
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
index fdce5e71def..34b4d9f0b0d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
/** RegionHeartbeatSample records the heartbeat sample of a Region. */
@@ -33,12 +32,6 @@ public class RegionHeartbeatSample extends
AbstractHeartbeatSample {
this.status = status;
}
- @TestOnly
- public RegionHeartbeatSample(RegionStatus status) {
- super(System.nanoTime());
- this.status = status;
- }
-
public RegionStatus getStatus() {
return status;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
index 0d885f90524..ff9ca15d1e8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
@@ -41,7 +40,6 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -128,22 +126,16 @@ public class EventService {
Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap = new TreeMap<>();
currentNodeStatisticsMap.forEach(
(nodeId, currentNodeStatistics) -> {
- NodeStatistics previousNodeStatistics =
previousNodeStatisticsMap.get(nodeId);
- if (previousNodeStatistics == null
- || (currentNodeStatistics.isNewerThan(previousNodeStatistics)
- && !currentNodeStatistics.equals(previousNodeStatistics))) {
+ NodeStatistics previousNodeStatistics =
+ previousNodeStatisticsMap.getOrDefault(
+ nodeId, NodeStatistics.generateDefaultNodeStatistics());
+ if (currentNodeStatistics.isNewerThan(previousNodeStatistics)
+ && !currentNodeStatistics.equals(previousNodeStatistics)) {
differentNodeStatisticsMap.put(
nodeId, new Pair<>(previousNodeStatistics,
currentNodeStatistics));
previousNodeStatisticsMap.put(nodeId, currentNodeStatistics);
}
});
- previousNodeStatisticsMap.forEach(
- (nodeId, previousNodeStatistics) -> {
- if (!currentNodeStatisticsMap.containsKey(nodeId)) {
- differentNodeStatisticsMap.put(nodeId, new
Pair<>(previousNodeStatistics, null));
- }
- });
-
previousNodeStatisticsMap.keySet().retainAll(currentNodeStatisticsMap.keySet());
if (!differentNodeStatisticsMap.isEmpty()) {
eventPublisher.post(new
NodeStatisticsChangeEvent(differentNodeStatisticsMap));
recordNodeStatistics(differentNodeStatisticsMap);
@@ -155,11 +147,14 @@ public class EventService {
LOGGER.info("[NodeStatistics] NodeStatisticsMap: ");
for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>>
nodeCacheEntry :
differentNodeStatisticsMap.entrySet()) {
- LOGGER.info(
- "[NodeStatistics]\t {}: {} -> {}",
- "nodeId{" + nodeCacheEntry.getKey() + "}",
- nodeCacheEntry.getValue().getLeft(),
- nodeCacheEntry.getValue().getRight());
+ if (!Objects.equals(
+ nodeCacheEntry.getValue().getRight(),
nodeCacheEntry.getValue().getLeft())) {
+ LOGGER.info(
+ "[NodeStatistics]\t {}: {}->{}",
+ "nodeId{" + nodeCacheEntry.getKey() + "}",
+ nodeCacheEntry.getValue().getLeft(),
+ nodeCacheEntry.getValue().getRight());
+ }
}
}
@@ -171,24 +166,16 @@ public class EventService {
currentRegionGroupStatisticsMap.forEach(
(regionGroupId, currentRegionGroupStatistics) -> {
RegionGroupStatistics previousRegionGroupStatistics =
- previousRegionGroupStatisticsMap.get(regionGroupId);
- if (previousRegionGroupStatistics == null
- ||
(currentRegionGroupStatistics.isNewerThan(previousRegionGroupStatistics)
- &&
!currentRegionGroupStatistics.equals(previousRegionGroupStatistics))) {
+ previousRegionGroupStatisticsMap.getOrDefault(
+ regionGroupId,
RegionGroupStatistics.generateDefaultRegionGroupStatistics());
+ if
(currentRegionGroupStatistics.isNewerThan(previousRegionGroupStatistics)
+ &&
!currentRegionGroupStatistics.equals(previousRegionGroupStatistics)) {
differentRegionGroupStatisticsMap.put(
regionGroupId,
new Pair<>(previousRegionGroupStatistics,
currentRegionGroupStatistics));
previousRegionGroupStatisticsMap.put(regionGroupId,
currentRegionGroupStatistics);
}
});
- previousRegionGroupStatisticsMap.forEach(
- (regionGroupId, previousRegionGroupStatistics) -> {
- if (!currentRegionGroupStatisticsMap.containsKey(regionGroupId)) {
- differentRegionGroupStatisticsMap.put(
- regionGroupId, new Pair<>(previousRegionGroupStatistics,
null));
- }
- });
-
previousRegionGroupStatisticsMap.keySet().retainAll(currentRegionGroupStatisticsMap.keySet());
if (!differentRegionGroupStatisticsMap.isEmpty()) {
eventPublisher.post(new
RegionGroupStatisticsChangeEvent(differentRegionGroupStatisticsMap));
recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
@@ -201,38 +188,38 @@ public class EventService {
LOGGER.info("[RegionGroupStatistics] RegionGroupStatisticsMap: ");
for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
regionGroupStatisticsEntry :
differentRegionGroupStatisticsMap.entrySet()) {
- RegionGroupStatistics previousStatistics =
regionGroupStatisticsEntry.getValue().getLeft();
- RegionGroupStatistics currentStatistics =
regionGroupStatisticsEntry.getValue().getRight();
- LOGGER.info(
- "[RegionGroupStatistics]\t RegionGroup {}: {} -> {}",
- regionGroupStatisticsEntry.getKey(),
- previousStatistics == null ? null :
previousStatistics.getRegionGroupStatus(),
- currentStatistics == null ? null :
currentStatistics.getRegionGroupStatus());
+ if (!Objects.equals(
+ regionGroupStatisticsEntry.getValue().getRight(),
+ regionGroupStatisticsEntry.getValue().getLeft())) {
+ LOGGER.info(
+ "[RegionGroupStatistics]\t RegionGroup {}: {} -> {}",
+ regionGroupStatisticsEntry.getKey(),
+
regionGroupStatisticsEntry.getValue().getLeft().getRegionGroupStatus(),
+
regionGroupStatisticsEntry.getValue().getRight().getRegionGroupStatus());
- List<Integer> leftIds =
- previousStatistics == null ? Collections.emptyList() :
previousStatistics.getRegionIds();
- List<Integer> rightIds =
- currentStatistics == null ? Collections.emptyList() :
currentStatistics.getRegionIds();
- for (Integer leftId : leftIds) {
- if (rightIds.contains(leftId)) {
- LOGGER.info(
- "[RegionGroupStatistics]\t Region in DataNode {}: {} -> {}",
- leftId,
- previousStatistics.getRegionStatus(leftId),
- currentStatistics.getRegionStatus(leftId));
- } else {
- LOGGER.info(
- "[RegionGroupStatistics]\t Region in DataNode {}: {} -> null",
- leftId,
- previousStatistics.getRegionStatus(leftId));
+ List<Integer> leftIds =
regionGroupStatisticsEntry.getValue().getLeft().getRegionIds();
+ List<Integer> rightIds =
regionGroupStatisticsEntry.getValue().getRight().getRegionIds();
+ for (Integer leftId : leftIds) {
+ if (!rightIds.contains(leftId)) {
+ LOGGER.info(
+ "[RegionGroupStatistics]\t Region in DataNode {}: {} -> null",
+ leftId,
+
regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId));
+ } else {
+ LOGGER.info(
+ "[RegionGroupStatistics]\t Region in DataNode {}: {} -> {}",
+ leftId,
+
regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId),
+
regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(leftId));
+ }
}
- }
- for (Integer rightId : rightIds) {
- if (!leftIds.contains(rightId)) {
- LOGGER.info(
- "[RegionGroupStatistics]\t Region in DataNode {}: null -> {}",
- rightId,
- currentStatistics.getRegionStatus(rightId));
+ for (Integer rightId : rightIds) {
+ if (!leftIds.contains(rightId)) {
+ LOGGER.info(
+ "[RegionGroupStatistics]\t Region in DataNode {}: null -> {}",
+ rightId,
+
regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(rightId));
+ }
}
}
}
@@ -246,10 +233,11 @@ public class EventService {
currentConsensusGroupStatisticsMap.forEach(
(consensusGroupId, currentConsensusGroupStatistics) -> {
ConsensusGroupStatistics previousConsensusGroupStatistics =
- previousConsensusGroupStatisticsMap.get(consensusGroupId);
- if (previousConsensusGroupStatistics == null
- ||
(currentConsensusGroupStatistics.isNewerThan(previousConsensusGroupStatistics)
- &&
!currentConsensusGroupStatistics.equals(previousConsensusGroupStatistics))) {
+ previousConsensusGroupStatisticsMap.getOrDefault(
+ consensusGroupId,
+
ConsensusGroupStatistics.generateDefaultConsensusGroupStatistics());
+ if
(currentConsensusGroupStatistics.isNewerThan(previousConsensusGroupStatistics)
+ &&
!currentConsensusGroupStatistics.equals(previousConsensusGroupStatistics)) {
differentConsensusGroupStatisticsMap.put(
consensusGroupId,
new Pair<>(previousConsensusGroupStatistics,
currentConsensusGroupStatistics));
@@ -257,16 +245,6 @@ public class EventService {
consensusGroupId, currentConsensusGroupStatistics);
}
});
- previousConsensusGroupStatisticsMap.forEach(
- (consensusGroupId, previousConsensusGroupStatistics) -> {
- if
(!currentConsensusGroupStatisticsMap.containsKey(consensusGroupId)) {
- differentConsensusGroupStatisticsMap.put(
- consensusGroupId, new Pair<>(previousConsensusGroupStatistics,
null));
- }
- });
- previousConsensusGroupStatisticsMap
- .keySet()
- .retainAll(currentConsensusGroupStatisticsMap.keySet());
if (!differentConsensusGroupStatisticsMap.isEmpty()) {
eventPublisher.post(
new
ConsensusGroupStatisticsChangeEvent(differentConsensusGroupStatisticsMap));
@@ -284,16 +262,11 @@ public class EventService {
consensusGroupStatisticsEntry.getValue().getRight(),
consensusGroupStatisticsEntry.getValue().getLeft())) {
LOGGER.info(
- "[ConsensusGroupStatistics]\t {}: {} -> {}",
+ "[ConsensusGroupStatistics]\t {}: {}->{}",
consensusGroupStatisticsEntry.getKey(),
consensusGroupStatisticsEntry.getValue().getLeft(),
consensusGroupStatisticsEntry.getValue().getRight());
}
}
}
-
- @TestOnly
- public EventBus getEventPublisher() {
- return eventPublisher;
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index cedc2f86c9e..5afd20c2965 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -272,11 +271,7 @@ public class NodeManager {
return resp;
}
- // Create a new DataNodeHeartbeatCache and force update NodeStatus
int dataNodeId = nodeInfo.generateNextNodeId();
-
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode,
dataNodeId);
- // TODO: invoke a force heartbeat to update new DataNode's status
immediately
-
RegisterDataNodePlan registerDataNodePlan =
new RegisterDataNodePlan(req.getDataNodeConfiguration());
// Register new DataNode
@@ -303,6 +298,8 @@ public class NodeManager {
// Adjust the maximum RegionGroup number of each Database
getClusterSchemaManager().adjustMaxRegionGroupNum();
+ // TODO: Add a force heartbeat to update LoadCache immediately
+
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
resp.setDataNodeId(
registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 0f5b7f87621..0509b6f0e37 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -366,16 +366,6 @@ public class ConfigNodeProcedureEnv {
ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
}
- /**
- * Create a new ConfigNodeHeartbeatCache
- *
- * @param nodeId the index of the new ConfigNode
- */
- public void createConfigNodeHeartbeatCache(int nodeId) {
-
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.ConfigNode,
nodeId);
- // TODO: invoke a force heartbeat to update new ConfigNode's status
immediately
- }
-
/**
* Mark the given datanode as removing status to avoid read or write request
routing to this node.
*
@@ -564,33 +554,13 @@ public class ConfigNodeProcedureEnv {
* Force activating RegionGroup by setting status to Running, therefore the
ConfigNode-leader can
* select leader for it and use it to allocate new Partitions
*
- * @param activateRegionGroupMap Map<Database, Map<RegionGroupId,
Map<DataNodeId, activate
- * heartbeat sample>>>
+ * @param activateRegionGroupMap Map<RegionGroupId, Map<DataNodeId, activate
heartbeat sample>>
*/
public void activateRegionGroup(
- Map<String, Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>>
- activateRegionGroupMap) {
- // Create RegionGroup heartbeat Caches
- activateRegionGroupMap.forEach(
- (database, regionGroupSampleMap) ->
- regionGroupSampleMap.forEach(
- (regionGroupId, regionSampleMap) ->
- getLoadManager()
- .getLoadCache()
- .createRegionGroupHeartbeatCache(
- database, regionGroupId,
regionSampleMap.keySet())));
- // Force update first heartbeat samples
- getLoadManager()
- .forceUpdateRegionGroupCache(
- activateRegionGroupMap.values().stream()
- .flatMap(innerMap -> innerMap.entrySet().stream())
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue, (a, b) -> b)));
+ Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>
activateRegionGroupMap) {
+ getLoadManager().forceUpdateRegionGroupCache(activateRegionGroupMap);
// Wait for leader and priority redistribution
- getLoadManager()
- .waitForRegionGroupReady(
- activateRegionGroupMap.values().stream()
- .flatMap(innterMap -> innterMap.keySet().stream())
- .collect(Collectors.toList()));
+ getLoadManager().waitForRegionGroupReady(new
ArrayList<>(activateRegionGroupMap.keySet()));
}
public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 4dac3f311cb..e946c4dbc78 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -417,7 +417,8 @@ public class RegionMaintainHandler {
return report;
}
- public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation
newLocation) {
+ public void addRegionLocation(
+ TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus
regionStatus) {
AddRegionLocationPlan req = new AddRegionLocationPlan(regionId,
newLocation);
TSStatus status =
configManager.getPartitionManager().addRegionLocation(req);
LOGGER.info(
@@ -427,15 +428,14 @@ public class RegionMaintainHandler {
status);
configManager
.getLoadManager()
- .getLoadCache()
- .createRegionCache(regionId, newLocation.getDataNodeId());
+ .forceAddRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
}
- public void forceUpdateRegionCache(
+ public void updateRegionCache(
TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus
regionStatus) {
configManager
.getLoadManager()
- .forceUpdateRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
+ .forceAddRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
}
public void removeRegionLocation(
@@ -447,7 +447,9 @@ public class RegionMaintainHandler {
regionId,
getIdWithRpcEndpoint(deprecatedLocation),
status);
- configManager.getLoadManager().removeRegionCache(regionId,
deprecatedLocation.getDataNodeId());
+ configManager
+ .getLoadManager()
+ .forceRemoveRegionCache(regionId, deprecatedLocation.getDataNodeId());
configManager.getLoadManager().getRouteBalancer().balanceRegionLeaderAndPriority();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index b38696a10ed..160551b6847 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -81,8 +81,8 @@ public class AddConfigNodeProcedure extends
AbstractNodeProcedure<AddConfigNodeS
break;
case REGISTER_SUCCESS:
env.notifyRegisterSuccess(tConfigNodeLocation);
-
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
env.applyConfigNode(tConfigNodeLocation, versionInfo);
+ // TODO: Add a force heartbeat to update LoadCache immediately
LOG.info("The ConfigNode: {} is successfully added to the cluster",
tConfigNodeLocation);
return Flow.NO_MORE_STATE;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index 3450fd27221..b8dd6075431 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -91,8 +91,7 @@ public class AddRegionPeerProcedure
getProcId(),
consensusGroupId.getId(),
destDataNode.getDataNodeId());
- handler.addRegionLocation(consensusGroupId, destDataNode);
- handler.forceUpdateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Adding);
+ handler.addRegionLocation(consensusGroupId, destDataNode,
RegionStatus.Adding);
TSStatus status = handler.createNewRegionPeer(consensusGroupId,
destDataNode);
setKillPoint(state);
if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
@@ -102,7 +101,7 @@ public class AddRegionPeerProcedure
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
break;
case DO_ADD_REGION_PEER:
- handler.forceUpdateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Adding);
+ handler.updateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Adding);
// We don't want to re-submit AddRegionPeerTask when leader change
or ConfigNode reboot
if (!this.isStateDeserialized()) {
TSStatus tsStatus =
@@ -142,7 +141,7 @@ public class AddRegionPeerProcedure
throw new UnsupportedOperationException(msg);
}
case UPDATE_REGION_LOCATION_CACHE:
- handler.forceUpdateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Running);
+ handler.updateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Running);
setKillPoint(state);
LOGGER.info("[pid{}][AddRegion] state {} complete", getProcId(),
state);
LOGGER.info(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 95ec7bda351..8498cf49c0a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -178,8 +178,8 @@ public class CreateRegionGroupsProcedure
case ACTIVATE_REGION_GROUPS:
long currentTime = System.nanoTime();
// Build RegionGroupCache immediately to make these successfully built
RegionGroup available
- Map<String, Map<TConsensusGroupId, Map<Integer,
RegionHeartbeatSample>>>
- activateRegionGroupMap = new TreeMap<>();
+ Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>
activateRegionGroupMap =
+ new TreeMap<>();
createRegionGroupsPlan
.getRegionGroupMap()
.forEach(
@@ -211,9 +211,8 @@ public class CreateRegionGroupsProcedure
? RegionStatus.Unknown
: RegionStatus.Running));
});
- activateRegionGroupMap
- .computeIfAbsent(database, empty -> new
TreeMap<>())
- .put(regionReplicaSet.getRegionId(),
activateSampleMap);
+ activateRegionGroupMap.put(
+ regionReplicaSet.getRegionId(),
activateSampleMap);
}
}));
env.activateRegionGroup(activateRegionGroupMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index ed4de51ecf8..1bdf473c9eb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -89,13 +89,13 @@ public class RemoveRegionPeerProcedure
getProcId(),
consensusGroupId.getId(),
targetDataNode.getDataNodeId());
- handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
+ handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
handler.transferRegionLeader(consensusGroupId, targetDataNode);
KillPoint.setKillPoint(state);
setNextState(REMOVE_REGION_PEER);
break;
case REMOVE_REGION_PEER:
- handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
+ handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
tsStatus =
handler.submitRemoveRegionPeerTask(
this.getProcId(), targetDataNode, consensusGroupId,
coordinator);
@@ -121,7 +121,7 @@ public class RemoveRegionPeerProcedure
setNextState(DELETE_OLD_REGION_PEER);
break;
case DELETE_OLD_REGION_PEER:
- handler.forceUpdateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
+ handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
tsStatus =
handler.submitDeleteOldRegionPeerTask(
this.getProcId(), targetDataNode, consensusGroupId);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/FakeSubscriber.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/FakeSubscriber.java
deleted file mode 100644
index 147f6aa7f9b..00000000000
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/FakeSubscriber.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
-import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
-import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
-import
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
-import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
-import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
-import
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
-
-import org.apache.tsfile.utils.Pair;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Semaphore;
-
-public class FakeSubscriber implements IClusterStatusSubscriber {
-
- private final Semaphore nodeSemaphore;
- private final Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap;
- private final Semaphore regionGroupSemaphore;
- private final Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
- differentRegionGroupStatisticsMap;
- private final Semaphore consensusGroupSemaphore;
- private final Map<TConsensusGroupId, Pair<ConsensusGroupStatistics,
ConsensusGroupStatistics>>
- differentConsensusGroupStatisticsMap;
-
- public FakeSubscriber(
- Semaphore nodeSemaphore, Semaphore regionGroupSemaphore, Semaphore
consensusGroupSemaphore) {
- this.nodeSemaphore = nodeSemaphore;
- this.regionGroupSemaphore = regionGroupSemaphore;
- this.consensusGroupSemaphore = consensusGroupSemaphore;
- differentNodeStatisticsMap = new TreeMap<>();
- differentRegionGroupStatisticsMap = new TreeMap<>();
- differentConsensusGroupStatisticsMap = new TreeMap<>();
- }
-
- @Override
- public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
- differentNodeStatisticsMap.clear();
- differentNodeStatisticsMap.putAll(event.getDifferentNodeStatisticsMap());
- nodeSemaphore.release();
- }
-
- @Override
- public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent
event) {
- differentRegionGroupStatisticsMap.clear();
-
differentRegionGroupStatisticsMap.putAll(event.getDifferentRegionGroupStatisticsMap());
- regionGroupSemaphore.release();
- }
-
- @Override
- public void
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
- differentConsensusGroupStatisticsMap.clear();
-
differentConsensusGroupStatisticsMap.putAll(event.getDifferentConsensusGroupStatisticsMap());
- consensusGroupSemaphore.release();
- }
-
- public Map<Integer, Pair<NodeStatistics, NodeStatistics>>
getDifferentNodeStatisticsMap() {
- return differentNodeStatisticsMap;
- }
-
- public Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
- getDifferentRegionGroupStatisticsMap() {
- return differentRegionGroupStatisticsMap;
- }
-
- public Map<TConsensusGroupId, Pair<ConsensusGroupStatistics,
ConsensusGroupStatistics>>
- getDifferentConsensusGroupStatisticsMap() {
- return differentConsensusGroupStatisticsMap;
- }
-}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
deleted file mode 100644
index 12b3dbb8233..00000000000
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.cluster.NodeType;
-import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
-import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
-import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
-import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
-import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
-import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
-import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
-import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
-
-import org.apache.tsfile.utils.Pair;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Semaphore;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-public class LoadManagerTest {
- private static LoadManager LOAD_MANAGER;
- private static LoadCache LOAD_CACHE;
- private static FakeSubscriber FAKE_SUBSCRIBER;
- private static Semaphore NODE_SEMAPHORE;
- private static Semaphore REGION_GROUP_SEMAPHORE;
- private static Semaphore CONSENSUS_GROUP_SEMAPHORE;
-
- @BeforeClass
- public static void setUp() throws IOException {
- IManager CONFIG_MANAGER = new ConfigManager();
- LOAD_MANAGER = CONFIG_MANAGER.getLoadManager();
- LOAD_CACHE = LOAD_MANAGER.getLoadCache();
- }
-
- @Before
- public void renewFakeSubscriber() {
- NODE_SEMAPHORE = new Semaphore(0);
- REGION_GROUP_SEMAPHORE = new Semaphore(0);
- CONSENSUS_GROUP_SEMAPHORE = new Semaphore(0);
- FAKE_SUBSCRIBER =
- new FakeSubscriber(NODE_SEMAPHORE, REGION_GROUP_SEMAPHORE,
CONSENSUS_GROUP_SEMAPHORE);
-
LOAD_MANAGER.getEventService().getEventPublisher().register(FAKE_SUBSCRIBER);
- }
-
- @Test
- public void testNodeCache() throws InterruptedException {
- // Create 1 ConfigNode and 1 DataNode heartbeat cache
- LOAD_CACHE.createNodeHeartbeatCache(NodeType.ConfigNode, 0);
- LOAD_CACHE.createNodeHeartbeatCache(NodeType.DataNode, 1);
-
- // Default NodeStatus is Unknown
- Assert.assertEquals(NodeStatus.Unknown, LOAD_CACHE.getNodeStatus(0));
- Assert.assertEquals(NodeStatus.Unknown, LOAD_CACHE.getNodeStatus(1));
-
- // Simulate update to Running status
- LOAD_CACHE.cacheConfigNodeHeartbeatSample(0, new
NodeHeartbeatSample(NodeStatus.Running));
- LOAD_CACHE.cacheDataNodeHeartbeatSample(1, new
NodeHeartbeatSample(NodeStatus.Running));
- LOAD_CACHE.updateNodeStatistics();
-
LOAD_MANAGER.getEventService().checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
- NODE_SEMAPHORE.acquire();
- Assert.assertEquals(NodeStatus.Running, LOAD_CACHE.getNodeStatus(0));
- Assert.assertEquals(NodeStatus.Running, LOAD_CACHE.getNodeStatus(1));
- Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap =
- FAKE_SUBSCRIBER.getDifferentNodeStatisticsMap();
- Assert.assertEquals(
- new Pair<>(null, new NodeStatistics(NodeStatus.Running)),
- differentNodeStatisticsMap.get(0));
- Assert.assertEquals(
- new Pair<>(null, new NodeStatistics(NodeStatus.Running)),
- differentNodeStatisticsMap.get(1));
-
- // Force update to Removing status
- LOAD_MANAGER.forceUpdateNodeCache(
- NodeType.DataNode, 1, new NodeHeartbeatSample(NodeStatus.Removing));
- NODE_SEMAPHORE.acquire();
- Assert.assertEquals(NodeStatus.Removing, LOAD_CACHE.getNodeStatus(1));
- differentNodeStatisticsMap =
FAKE_SUBSCRIBER.getDifferentNodeStatisticsMap();
- // Only DataNode 1 is updated
- Assert.assertEquals(1, differentNodeStatisticsMap.size());
- Assert.assertEquals(
- new Pair<>(new NodeStatistics(NodeStatus.Running), new
NodeStatistics(NodeStatus.Removing)),
- differentNodeStatisticsMap.get(1));
-
- // Removing status can't be updated to any other status automatically
- LOAD_CACHE.cacheDataNodeHeartbeatSample(1, new
NodeHeartbeatSample(NodeStatus.ReadOnly));
- LOAD_CACHE.updateNodeStatistics();
-
LOAD_MANAGER.getEventService().checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
- Assert.assertEquals(NodeStatus.Removing, LOAD_CACHE.getNodeStatus(1));
-
- // Remove NodeCache
- LOAD_MANAGER.removeNodeCache(1);
- NODE_SEMAPHORE.acquire();
- differentNodeStatisticsMap =
FAKE_SUBSCRIBER.getDifferentNodeStatisticsMap();
- // Only DataNode 1 is updated
- Assert.assertEquals(1, differentNodeStatisticsMap.size());
- Assert.assertEquals(
- new Pair<>(new NodeStatistics(NodeStatus.Removing), null),
- differentNodeStatisticsMap.get(1));
- }
-
- @Test
- public void testRegionGroupCache() throws InterruptedException {
- // Create 1 RegionGroup heartbeat cache
- TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 0);
- Set<Integer> dataNodeIds = Stream.of(0, 1, 2).collect(Collectors.toSet());
- LOAD_CACHE.createRegionGroupHeartbeatCache("root.db", regionGroupId,
dataNodeIds);
-
- // Default RegionGroupStatus is Disabled and RegionStatus are Unknown
- Assert.assertEquals(RegionGroupStatus.Disabled,
LOAD_CACHE.getRegionGroupStatus(regionGroupId));
- dataNodeIds.forEach(
- dataNodeId ->
- Assert.assertEquals(
- RegionStatus.Unknown,
LOAD_CACHE.getRegionStatus(regionGroupId, dataNodeId)));
-
- // Simulate update Regions to Running status
- dataNodeIds.forEach(
- dataNodeId ->
- LOAD_CACHE.cacheRegionHeartbeatSample(
- regionGroupId, dataNodeId, new
RegionHeartbeatSample(RegionStatus.Running), false));
- LOAD_CACHE.updateRegionGroupStatistics();
-
LOAD_MANAGER.getEventService().checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
- REGION_GROUP_SEMAPHORE.acquire();
- Assert.assertEquals(RegionGroupStatus.Running,
LOAD_CACHE.getRegionGroupStatus(regionGroupId));
- dataNodeIds.forEach(
- dataNodeId ->
- Assert.assertEquals(
- RegionStatus.Running,
LOAD_CACHE.getRegionStatus(regionGroupId, dataNodeId)));
- Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
- differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
- Map<Integer, RegionStatistics> allRunningRegionStatisticsMap =
- dataNodeIds.stream()
- .collect(
- Collectors.toMap(
- dataNodeId -> dataNodeId,
- dataNodeId -> new RegionStatistics(RegionStatus.Running)));
- Assert.assertEquals(
- new Pair<>(
- null,
- new RegionGroupStatistics(RegionGroupStatus.Running,
allRunningRegionStatisticsMap)),
- differentRegionGroupStatisticsMap.get(regionGroupId));
-
- // Simulate Region migration from [0, 1, 2] to [1, 2, 3]
- int removeDataNodeId = 0;
- LOAD_MANAGER.forceUpdateRegionCache(regionGroupId, removeDataNodeId,
RegionStatus.Removing);
- REGION_GROUP_SEMAPHORE.acquire();
- // Mark Region 0 as Removing
- Assert.assertEquals(
- RegionStatus.Removing, LOAD_CACHE.getRegionStatus(regionGroupId,
removeDataNodeId));
- differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
- Map<Integer, RegionStatistics> oneRemovingRegionStatisticsMap =
- new TreeMap<>(allRunningRegionStatisticsMap);
- oneRemovingRegionStatisticsMap.replace(
- removeDataNodeId, new RegionStatistics(RegionStatus.Removing));
- Assert.assertEquals(
- new Pair<>(
- new RegionGroupStatistics(RegionGroupStatus.Running,
allRunningRegionStatisticsMap),
- new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap)),
- differentRegionGroupStatisticsMap.get(regionGroupId));
- // Add and mark Region 3 as Adding
- int addDataNodeId = 3;
- LOAD_CACHE.createRegionCache(regionGroupId, addDataNodeId);
- LOAD_MANAGER.forceUpdateRegionCache(regionGroupId, addDataNodeId,
RegionStatus.Adding);
- REGION_GROUP_SEMAPHORE.acquire();
- Assert.assertEquals(
- RegionStatus.Adding, LOAD_CACHE.getRegionStatus(regionGroupId,
addDataNodeId));
- differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
- Map<Integer, RegionStatistics> oneAddingRegionStatisticsMap =
- new TreeMap<>(oneRemovingRegionStatisticsMap);
- oneAddingRegionStatisticsMap.put(addDataNodeId, new
RegionStatistics(RegionStatus.Adding));
- Assert.assertEquals(
- new Pair<>(
- new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap),
- new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneAddingRegionStatisticsMap)),
- differentRegionGroupStatisticsMap.get(regionGroupId));
- // Both Region 0 and 3 can't be updated
- LOAD_CACHE.cacheRegionHeartbeatSample(
- regionGroupId, removeDataNodeId, new
RegionHeartbeatSample(RegionStatus.Unknown), false);
- LOAD_CACHE.cacheRegionHeartbeatSample(
- regionGroupId, addDataNodeId, new
RegionHeartbeatSample(RegionStatus.ReadOnly), false);
- LOAD_CACHE.updateRegionGroupStatistics();
-
LOAD_MANAGER.getEventService().checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
- Assert.assertEquals(
- RegionStatus.Removing, LOAD_CACHE.getRegionStatus(regionGroupId,
removeDataNodeId));
- Assert.assertEquals(
- RegionStatus.Adding, LOAD_CACHE.getRegionStatus(regionGroupId,
addDataNodeId));
- // Adding process completed
- LOAD_MANAGER.forceUpdateRegionCache(regionGroupId, addDataNodeId,
RegionStatus.Running);
- REGION_GROUP_SEMAPHORE.acquire();
- Assert.assertEquals(
- RegionStatus.Running, LOAD_CACHE.getRegionStatus(regionGroupId,
addDataNodeId));
- differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
- oneRemovingRegionStatisticsMap.put(addDataNodeId, new
RegionStatistics(RegionStatus.Running));
- Assert.assertEquals(
- new Pair<>(
- new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneAddingRegionStatisticsMap),
- new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap)),
- differentRegionGroupStatisticsMap.get(regionGroupId));
- // Removing process completed
- LOAD_MANAGER.removeRegionCache(regionGroupId, removeDataNodeId);
- REGION_GROUP_SEMAPHORE.acquire();
- differentRegionGroupStatisticsMap =
FAKE_SUBSCRIBER.getDifferentRegionGroupStatisticsMap();
- allRunningRegionStatisticsMap.remove(removeDataNodeId);
- allRunningRegionStatisticsMap.put(addDataNodeId, new
RegionStatistics(RegionStatus.Running));
- Assert.assertEquals(
- new Pair<>(
- new RegionGroupStatistics(RegionGroupStatus.Disabled,
oneRemovingRegionStatisticsMap),
- new RegionGroupStatistics(RegionGroupStatus.Running,
allRunningRegionStatisticsMap)),
- differentRegionGroupStatisticsMap.get(regionGroupId));
- }
-
- @Test
- public void testConsensusGroupCache() throws InterruptedException {
- // Create 1 Consensus heartbeat cache
- TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
- Set<Integer> dataNodeIds = Stream.of(0, 1, 2).collect(Collectors.toSet());
- LOAD_CACHE.createRegionGroupHeartbeatCache("root.db", regionGroupId,
dataNodeIds);
-
- // Default ConsensusGroupStatus is leaderId == -1
- Assert.assertEquals(-1, (int)
LOAD_CACHE.getRegionLeaderMap().get(regionGroupId));
-
- // Simulate select leaderId == 1
- int originLeaderId = 1;
- LOAD_CACHE.cacheConsensusSample(
- regionGroupId, new ConsensusGroupHeartbeatSample(originLeaderId));
- LOAD_CACHE.updateConsensusGroupStatistics();
- LOAD_MANAGER
- .getEventService()
- .checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary();
- CONSENSUS_GROUP_SEMAPHORE.acquire();
- Map<TConsensusGroupId, Pair<ConsensusGroupStatistics,
ConsensusGroupStatistics>>
- differentConsensusGroupStatisticsMap =
- FAKE_SUBSCRIBER.getDifferentConsensusGroupStatisticsMap();
- Assert.assertEquals(
- new Pair<>(null, new ConsensusGroupStatistics(originLeaderId)),
- differentConsensusGroupStatisticsMap.get(regionGroupId));
-
- // Force update leader to 2
- int newLeaderId = 2;
- LOAD_MANAGER.forceUpdateConsensusGroupCache(
- Collections.singletonMap(regionGroupId, new
ConsensusGroupHeartbeatSample(newLeaderId)));
- CONSENSUS_GROUP_SEMAPHORE.acquire();
- differentConsensusGroupStatisticsMap =
- FAKE_SUBSCRIBER.getDifferentConsensusGroupStatisticsMap();
- Assert.assertEquals(
- new Pair<>(
- new ConsensusGroupStatistics(originLeaderId),
- new ConsensusGroupStatistics(newLeaderId)),
- differentConsensusGroupStatisticsMap.get(regionGroupId));
-
- // Remove ConsensusGroupCache
- LOAD_MANAGER.removeRegionGroupRelatedCache(regionGroupId);
- CONSENSUS_GROUP_SEMAPHORE.acquire();
- differentConsensusGroupStatisticsMap =
- FAKE_SUBSCRIBER.getDifferentConsensusGroupStatisticsMap();
- // Only SchemaRegionGroup 1 is updated
- Assert.assertEquals(1, differentConsensusGroupStatisticsMap.size());
- Assert.assertEquals(
- new Pair<>(new ConsensusGroupStatistics(newLeaderId), null),
- differentConsensusGroupStatisticsMap.get(regionGroupId));
- }
-}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
index f2f51e044ff..7cc1f50c28c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
@@ -39,11 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
public class CFDLeaderBalancerTest {
@@ -103,14 +101,10 @@ public class CFDLeaderBalancerTest {
// Prepare input parameters
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
databaseRegionGroupMap.put(DATABASE, regionGroupIds);
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
- regionReplicaSetMap.put(
- regionReplicaSet.getRegionId(),
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet())));
+ regionReplicaSetMap.put(regionReplicaSet.getRegionId(),
regionReplicaSet));
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
@@ -153,12 +147,8 @@ public class CFDLeaderBalancerTest {
// Prepare input parameters
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
databaseRegionGroupMap.put(DATABASE,
Collections.singletonList(regionReplicaSet.getRegionId()));
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
- regionReplicaSetMap.put(
- regionReplicaSet.getRegionId(),
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>();
@@ -200,7 +190,7 @@ public class CFDLeaderBalancerTest {
nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
// Prepare RegionGroups
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
for (int i = 0; i < 5; i++) {
@@ -214,11 +204,7 @@ public class CFDLeaderBalancerTest {
Arrays.asList(
new TDataNodeLocation().setDataNodeId(0),
new TDataNodeLocation().setDataNodeId(1)));
- regionReplicaSetMap.put(
- regionGroupId,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, 0);
// Assuming all Regions are migrating from DataNode-1 to DataNode-2
Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
@@ -269,7 +255,7 @@ public class CFDLeaderBalancerTest {
Random random = new Random();
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
databaseRegionGroupMap.put(DATABASE, new ArrayList<>());
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
for (int i = 0; i < regionGroupNum; i++) {
@@ -287,11 +273,7 @@ public class CFDLeaderBalancerTest {
regionStatisticsMap.put(regionGroupId, regionStatistics);
databaseRegionGroupMap.get(DATABASE).add(regionGroupId);
- regionReplicaSetMap.put(
- regionGroupId,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, leaderId);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
index b531896d37c..6b97e1b385b 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -35,11 +35,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
public class GreedyLeaderBalancerTest {
@@ -47,7 +45,7 @@ public class GreedyLeaderBalancerTest {
@Test
public void optimalLeaderDistributionTest() {
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
@@ -69,11 +67,7 @@ public class GreedyLeaderBalancerTest {
regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(
- regionGroupId,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, random.nextInt(3));
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
@@ -89,11 +83,7 @@ public class GreedyLeaderBalancerTest {
regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(
- regionGroupId,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3));
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
@@ -120,7 +110,7 @@ public class GreedyLeaderBalancerTest {
@Test
public void disableTest() {
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new TreeMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap
= new TreeMap<>();
@@ -145,11 +135,7 @@ public class GreedyLeaderBalancerTest {
j, new RegionStatistics(j == 1 ? RegionStatus.Unknown :
RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(
- regionGroupId,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, 1);
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
@@ -166,11 +152,7 @@ public class GreedyLeaderBalancerTest {
j, new RegionStatistics(j == 4 ? RegionStatus.ReadOnly :
RegionStatus.Running));
}
TRegionReplicaSet regionReplicaSet = new
TRegionReplicaSet(regionGroupId, dataNodeLocations);
- regionReplicaSetMap.put(
- regionGroupId,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, 4);
regionStatisticsMap.put(regionGroupId, regionStatistics);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
index 885c97989da..fca962a2e56 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -46,7 +46,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
public class LeaderBalancerComparisonTest {
@@ -82,7 +81,7 @@ public class LeaderBalancerComparisonTest {
// Simulate each DataNode has 16 CPU cores
// and each RegionGroup has 3 replicas
int regionGroupNum = TEST_CPU_CORE_NUM * dataNodeNum / TEST_REPLICA_NUM;
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = new
HashMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
generateTestData(dataNodeNum, regionGroupNum, regionReplicaSetMap,
regionLeaderMap);
@@ -102,9 +101,13 @@ public class LeaderBalancerComparisonTest {
regionReplicaSetMap.forEach(
(regionGroupId, regionReplicaSet) -> {
Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
- regionReplicaSet.forEach(
- dataNodeId ->
- regionStatistics.put(dataNodeId, new
RegionStatistics(RegionStatus.Running)));
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionStatistics.put(
+ dataNodeLocation.getDataNodeId(),
+ new RegionStatistics(RegionStatus.Running)));
allRunningRegionStatistics.put(regionGroupId, regionStatistics);
});
Statistics greedyStatistics =
@@ -160,13 +163,15 @@ public class LeaderBalancerComparisonTest {
regionReplicaSetMap.forEach(
(regionGroupId, regionReplicaSet) -> {
Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
- regionReplicaSet.forEach(
- dataNodeId ->
- regionStatistics.put(
- dataNodeId,
- disabledDataNodeSet.contains(dataNodeId)
- ? new RegionStatistics(RegionStatus.Unknown)
- : new RegionStatistics(RegionStatus.Running)));
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionStatistics.put(
+ dataNodeLocation.getDataNodeId(),
+
disabledDataNodeSet.contains(dataNodeLocation.getDataNodeId())
+ ? new RegionStatistics(RegionStatus.Unknown)
+ : new RegionStatistics(RegionStatus.Running)));
disabledRegionStatistics.put(regionGroupId, regionStatistics);
});
greedyStatistics =
@@ -233,7 +238,7 @@ public class LeaderBalancerComparisonTest {
private void generateTestData(
int dataNodeNum,
int regionGroupNum,
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap) {
Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
@@ -287,11 +292,7 @@ public class LeaderBalancerComparisonTest {
randomNum -= 1;
}
- regionReplicaSetMap.put(
- regionGroupId,
- regionReplicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet()));
+ regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionReplicaSet
.getDataNodeLocations()
.forEach(
@@ -306,7 +307,7 @@ public class LeaderBalancerComparisonTest {
int dataNodeNum,
int regionGroupNum,
AbstractLeaderBalancer leaderBalancer,
- Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, NodeStatistics> nodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap,
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
index e340c347e11..a961eb310e7 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
@@ -26,18 +26,12 @@ import
org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.junit.Assert;
import org.junit.Test;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
public class RegionGroupCacheTest {
- private static final String DATABASE = "root.db";
-
@Test
public void getRegionStatusTest() {
long currentTime = System.nanoTime();
- RegionGroupCache regionGroupCache =
- new RegionGroupCache(DATABASE, Stream.of(0, 1, 2,
3).collect(Collectors.toSet()));
+ RegionGroupCache regionGroupCache = new RegionGroupCache();
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.cacheHeartbeatSample(
@@ -61,8 +55,7 @@ public class RegionGroupCacheTest {
@Test
public void getRegionGroupStatusTest() {
long currentTime = System.nanoTime();
- RegionGroupCache runningRegionGroup =
- new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
+ RegionGroupCache runningRegionGroup = new RegionGroupCache();
runningRegionGroup.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
runningRegionGroup.cacheHeartbeatSample(
@@ -74,8 +67,7 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Running,
runningRegionGroup.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache availableRegionGroup =
- new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
+ RegionGroupCache availableRegionGroup = new RegionGroupCache();
availableRegionGroup.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
availableRegionGroup.cacheHeartbeatSample(
@@ -87,8 +79,7 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Available,
availableRegionGroup.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache disabledRegionGroup0 =
- new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
+ RegionGroupCache disabledRegionGroup0 = new RegionGroupCache();
disabledRegionGroup0.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
disabledRegionGroup0.cacheHeartbeatSample(
@@ -100,8 +91,7 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Discouraged,
disabledRegionGroup0.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache disabledRegionGroup1 =
- new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
+ RegionGroupCache disabledRegionGroup1 = new RegionGroupCache();
disabledRegionGroup1.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
disabledRegionGroup1.cacheHeartbeatSample(
@@ -113,8 +103,7 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Disabled,
disabledRegionGroup1.getCurrentStatistics().getRegionGroupStatus());
- RegionGroupCache disabledRegionGroup2 =
- new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()));
+ RegionGroupCache disabledRegionGroup2 = new RegionGroupCache();
disabledRegionGroup2.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
disabledRegionGroup2.cacheHeartbeatSample(