This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch region-multi-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/region-multi-database by this
push:
new 33991ace4b3 compare algorithms
33991ace4b3 is described below
commit 33991ace4b31c49173f6396cb40fb9f1c8f8b535
Author: YongzaoDan <[email protected]>
AuthorDate: Fri Mar 8 10:56:09 2024 +0800
compare algorithms
---
.../region/CopySetRegionGroupAllocator.java | 106 ++++++++++++
.../region/GreedyCopySetRegionGroupAllocator.java | 7 +-
.../region/TieredReplicationAllocator.java | 167 +++++++++++++++++++
.../router/leader/GreedyLeaderBalancer.java | 183 +++++++++++----------
...orAndLeaderBalancerCombinatorialManualTest.java | 44 ++---
5 files changed, 400 insertions(+), 107 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java
new file mode 100644
index 00000000000..7580a398e72
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class CopySetRegionGroupAllocator implements IRegionGroupAllocator {
+
+ private final Random RANDOM = new Random();
+ private final Map<Integer, List<List<Integer>>> COPY_SETS = new TreeMap<>();
+
+ private final int dataNodeNum;
+
+ public CopySetRegionGroupAllocator(int dataNodeNum, int replicationFactor,
int loadFactor) {
+ this.dataNodeNum = dataNodeNum;
+ BitSet bitSet = new BitSet(dataNodeNum + 1);
+ for (int p = 0; p < loadFactor || bitSet.cardinality() < dataNodeNum; p++)
{
+ List<Integer> permutation = new ArrayList<>();
+ for (int i = 1; i <= dataNodeNum; i++) {
+ permutation.add(i);
+ }
+ for (int i = 1; i < dataNodeNum; i++) {
+ int pos = RANDOM.nextInt(i);
+ int tmp = permutation.get(i);
+ permutation.set(i, permutation.get(pos));
+ permutation.set(pos, tmp);
+ }
+ for (int i = 0; i + replicationFactor < permutation.size(); i +=
replicationFactor) {
+ List<Integer> copySet = new ArrayList<>();
+ for (int j = 0; j < replicationFactor; j++) {
+ int e = permutation.get(i + j);
+ copySet.add(e);
+ bitSet.set(e);
+ }
+ for (int c : copySet) {
+ COPY_SETS.computeIfAbsent(c, k -> new ArrayList<>()).add(copySet);
+ }
+ }
+ }
+ }
+
+ @Override
+ public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId) {
+ TRegionReplicaSet result = new TRegionReplicaSet();
+ Map<Integer, Integer> regionCounter = new TreeMap<>();
+ for (int i = 1; i <= dataNodeNum; i++) {
+ regionCounter.put(i, 0);
+ }
+ allocatedRegionGroups.forEach(
+ regionGroup ->
+ regionGroup
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionCounter.merge(dataNodeLocation.getDataNodeId(),
1, Integer::sum)));
+ int firstRegion = -1, minCount = Integer.MAX_VALUE;
+ for (Map.Entry<Integer, Integer> counterEntry : regionCounter.entrySet()) {
+ int dataNodeId = counterEntry.getKey();
+ int regionCount = counterEntry.getValue();
+ if (regionCount < minCount) {
+ minCount = regionCount;
+ firstRegion = dataNodeId;
+ } else if (regionCount == minCount && RANDOM.nextBoolean()) {
+ firstRegion = dataNodeId;
+ }
+ }
+ List<Integer> copySet =
+
COPY_SETS.get(firstRegion).get(RANDOM.nextInt(COPY_SETS.get(firstRegion).size()));
+ for (int dataNodeId : copySet) {
+
result.addToDataNodeLocations(availableDataNodeMap.get(dataNodeId).getLocation());
+ }
+ return result.setRegionId(consensusGroupId);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index 2f0afa0065f..6092fa48f0c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -59,7 +59,7 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
// RegionGroups is minimal
int optimalCombinationSum;
List<int[]> optimalReplicaSets;
- private static final int MAX_OPTIMAL_PLAN_NUM = 100;
+ private static final int MAX_OPTIMAL_PLAN_NUM = 1000;
private static class DataNodeEntry {
@@ -126,6 +126,11 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
for (int i = 0; i < replicationFactor; i++) {
result.addToDataNodeLocations(availableDataNodeMap.get(optimalReplicaSet[i]).getLocation());
}
+
+ if (optimalCombinationSum > 0) {
+ System.out.println("The optimal combination sum is " +
optimalCombinationSum);
+ }
+
return result;
} finally {
clear();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/TieredReplicationAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/TieredReplicationAllocator.java
new file mode 100644
index 00000000000..ab349e6f215
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/TieredReplicationAllocator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TieredReplicationAllocator implements IRegionGroupAllocator {
+
+ private final int dataNodeNum;
+ private final Random RANDOM = new Random();
+ private final Map<Integer, List<List<Integer>>> COPY_SETS = new TreeMap<>();
+
+ private static class DataNodeEntry {
+
+ private final int dataNodeId;
+ private final int scatterWidth;
+
+ public DataNodeEntry(int dataNodeId, int scatterWidth) {
+ this.dataNodeId = dataNodeId;
+ this.scatterWidth = scatterWidth;
+ }
+
+ public int getDataNodeId() {
+ return dataNodeId;
+ }
+
+ public int compare(DataNodeEntry other) {
+ return Integer.compare(scatterWidth, other.scatterWidth);
+ }
+ }
+
+ public TieredReplicationAllocator(int dataNodeNum, int replicationFactor,
int loadFactor) {
+ this.dataNodeNum = dataNodeNum;
+ Map<Integer, BitSet> scatterWidthMap = new TreeMap<>();
+ for (int i = 1; i <= dataNodeNum; i++) {
+ scatterWidthMap.put(i, new BitSet(dataNodeNum + 1));
+ }
+ int targetScatterWidth = loadFactor * (replicationFactor - 1);
+ while (existScatterWidthUnsatisfied(scatterWidthMap, targetScatterWidth)) {
+ for (int firstRegion = 1; firstRegion <= dataNodeNum; firstRegion++) {
+ if (scatterWidthMap.get(firstRegion).cardinality() <
targetScatterWidth) {
+ List<Integer> copySet = new ArrayList<>();
+ copySet.add(firstRegion);
+ List<DataNodeEntry> otherDataNodes = new ArrayList<>();
+ for (int i = 1; i <= dataNodeNum; i++) {
+ if (i != firstRegion) {
+ otherDataNodes.add(new DataNodeEntry(i,
scatterWidthMap.get(i).cardinality()));
+ }
+ }
+ otherDataNodes.sort(DataNodeEntry::compare);
+ for (DataNodeEntry entry : otherDataNodes) {
+ boolean accepted = true;
+ int secondRegion = entry.getDataNodeId();
+ for (int e : copySet) {
+ if (scatterWidthMap.get(e).get(secondRegion)) {
+ accepted = false;
+ break;
+ }
+ }
+ if (accepted) {
+ copySet.add(secondRegion);
+ }
+ if (copySet.size() == replicationFactor) {
+ break;
+ }
+ }
+
+ while (copySet.size() < replicationFactor) {
+ int secondRegion = RANDOM.nextInt(dataNodeNum) + 1;
+ while (copySet.contains(secondRegion)) {
+ secondRegion = RANDOM.nextInt(dataNodeNum) + 1;
+ }
+ copySet.add(secondRegion);
+ }
+
+ for (int i = 0; i < copySet.size(); i++) {
+ for (int j = i + 1; j < copySet.size(); j++) {
+ scatterWidthMap.get(copySet.get(i)).set(copySet.get(j));
+ scatterWidthMap.get(copySet.get(j)).set(copySet.get(i));
+ }
+ }
+ for (int e : copySet) {
+ COPY_SETS.computeIfAbsent(e, k -> new ArrayList<>()).add(copySet);
+ }
+ }
+ }
+ }
+ }
+
+ private boolean existScatterWidthUnsatisfied(
+ Map<Integer, BitSet> scatterWidthMap, int targetScatterWidth) {
+ AtomicBoolean result = new AtomicBoolean(false);
+ scatterWidthMap.forEach(
+ (k, v) -> {
+ if (v.cardinality() < targetScatterWidth) {
+ result.set(true);
+ }
+ });
+ return result.get();
+ }
+
+ @Override
+ public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId) {
+ TRegionReplicaSet result = new TRegionReplicaSet();
+ Map<Integer, Integer> regionCounter = new TreeMap<>();
+ for (int i = 1; i <= dataNodeNum; i++) {
+ regionCounter.put(i, 0);
+ }
+ allocatedRegionGroups.forEach(
+ regionGroup ->
+ regionGroup
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ regionCounter.merge(dataNodeLocation.getDataNodeId(),
1, Integer::sum)));
+ int firstRegion = -1, minCount = Integer.MAX_VALUE;
+ for (Map.Entry<Integer, Integer> counterEntry : regionCounter.entrySet()) {
+ int dataNodeId = counterEntry.getKey();
+ int regionCount = counterEntry.getValue();
+ if (regionCount < minCount) {
+ minCount = regionCount;
+ firstRegion = dataNodeId;
+ } else if (regionCount == minCount && RANDOM.nextBoolean()) {
+ firstRegion = dataNodeId;
+ }
+ }
+ List<Integer> copySet =
+
COPY_SETS.get(firstRegion).get(RANDOM.nextInt(COPY_SETS.get(firstRegion).size()));
+ for (int dataNodeId : copySet) {
+
result.addToDataNodeLocations(availableDataNodeMap.get(dataNodeId).getLocation());
+ }
+ return result.setRegionId(consensusGroupId);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 5619312f4e7..5ad15c5a422 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -16,21 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
/** Leader distribution balancer that uses greedy algorithm */
public class GreedyLeaderBalancer implements ILeaderBalancer {
@@ -75,92 +75,103 @@ public class GreedyLeaderBalancer implements
ILeaderBalancer {
}
private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
- /* Count the number of leaders that each DataNode have */
- // Map<DataNodeId, leader count>
- Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ regionLeaderMap.clear();
+ Map<Integer, Integer> leaderCounter = new TreeMap<>();
regionReplicaSetMap.forEach(
- (regionGroupId, regionReplicaSet) ->
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation ->
- leaderCounter.putIfAbsent(
- dataNodeLocation.getDataNodeId(), new
AtomicInteger(0))));
- regionLeaderMap.forEach(
- (regionGroupId, leaderId) ->
leaderCounter.get(leaderId).getAndIncrement());
-
- /* Ensure all RegionGroups' leader are not inside disabled DataNodes */
- for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
- int leaderId = regionLeaderMap.get(regionGroupId);
- if (disabledDataNodeSet.contains(leaderId)) {
- int newLeaderId = -1;
- int newLeaderWeight = Integer.MAX_VALUE;
- for (TDataNodeLocation candidate :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int candidateId = candidate.getDataNodeId();
- int candidateWeight = leaderCounter.get(candidateId).get();
- // Select the available DataNode with the fewest leaders
- if (!disabledDataNodeSet.contains(candidateId) && candidateWeight <
newLeaderWeight) {
- newLeaderId = candidateId;
- newLeaderWeight = candidateWeight;
+ (regionGroupId, regionGroup) -> {
+ int minCount = Integer.MAX_VALUE, leaderId = -1;
+ for (TDataNodeLocation dataNodeLocation :
regionGroup.getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ int count = leaderCounter.getOrDefault(dataNodeId, 0);
+ if (count < minCount) {
+ minCount = count;
+ leaderId = dataNodeId;
+ }
}
- }
-
- if (newLeaderId != -1) {
- leaderCounter.get(leaderId).getAndDecrement();
- leaderCounter.get(newLeaderId).getAndIncrement();
- regionLeaderMap.replace(regionGroupId, newLeaderId);
- }
- }
- }
-
- /* Double keyword sort */
- List<WeightEntry> weightList = new ArrayList<>();
- for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
- int leaderId = regionLeaderMap.get(regionGroupId);
- int leaderWeight =
leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
-
- int followerWeight = Integer.MAX_VALUE;
- for (TDataNodeLocation follower :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int followerId = follower.getDataNodeId();
- if (followerId != leaderId) {
- followerWeight = Math.min(followerWeight,
leaderCounter.get(followerId).get());
- }
- }
-
- weightList.add(new WeightEntry(regionGroupId, leaderWeight,
followerWeight));
- }
- weightList.sort(WeightEntry.COMPARATOR);
-
- /* Greedy distribution */
- for (WeightEntry weightEntry : weightList) {
- TConsensusGroupId regionGroupId = weightEntry.regionGroupId;
- int leaderId = regionLeaderMap.get(regionGroupId);
- int leaderWeight =
leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
-
- int newLeaderId = -1;
- int newLeaderWeight = Integer.MAX_VALUE;
- for (TDataNodeLocation candidate :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int candidateId = candidate.getDataNodeId();
- int candidateWeight = leaderCounter.get(candidateId).get();
- if (!disabledDataNodeSet.contains(candidateId)
- && candidateId != leaderId
- && candidateWeight < newLeaderWeight) {
- newLeaderId = candidateId;
- newLeaderWeight = candidateWeight;
- }
- }
-
- // Redistribution takes effect only when leaderWeight - newLeaderWeight
> 1.
- // i.e. Redistribution can reduce the range of the number of leaders
that each DataNode owns.
- if (leaderWeight - newLeaderWeight > 1) {
- leaderCounter.get(leaderId).getAndDecrement();
- leaderCounter.get(newLeaderId).getAndIncrement();
- regionLeaderMap.replace(regionGroupId, newLeaderId);
- }
- }
+ regionLeaderMap.put(regionGroupId, leaderId);
+ leaderCounter.merge(leaderId, 1, Integer::sum);
+ });
+
+ // /* Count the number of leaders that each DataNode have */
+ // // Map<DataNodeId, leader count>
+ // Map<Integer, Integer> leaderCounter = new ConcurrentHashMap<>();
+ // regionLeaderMap.forEach(
+ // (regionGroupId, leaderId) -> leaderCounter.merge(leaderId, 1,
Integer::sum));
+ //
+ // /* Ensure all RegionGroups' leader are not inside disabled DataNodes
*/
+ // for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet())
{
+ // int leaderId = regionLeaderMap.get(regionGroupId);
+ // if (disabledDataNodeSet.contains(leaderId)) {
+ // int newLeaderId = -1;
+ // int newLeaderWeight = Integer.MAX_VALUE;
+ // for (TDataNodeLocation candidate :
+ //
regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ // int candidateId = candidate.getDataNodeId();
+ // int candidateWeight = leaderCounter.get(candidateId);
+ // // Select the available DataNode with the fewest leaders
+ // if (!disabledDataNodeSet.contains(candidateId) &&
candidateWeight < newLeaderWeight)
+ // {
+ // newLeaderId = candidateId;
+ // newLeaderWeight = candidateWeight;
+ // }
+ // }
+ //
+ // if (newLeaderId != -1) {
+ // leaderCounter.merge(leaderId, -1, Integer::sum);
+ // leaderCounter.merge(newLeaderId, 1, Integer::sum);
+ // regionLeaderMap.replace(regionGroupId, newLeaderId);
+ // }
+ // }
+ // }
+ //
+ // /* Double keyword sort */
+ // List<WeightEntry> weightList = new ArrayList<>();
+ // for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet())
{
+ // int leaderId = regionLeaderMap.get(regionGroupId);
+ // int leaderWeight =
leaderCounter.get(regionLeaderMap.get(regionGroupId));
+ //
+ // int followerWeight = Integer.MAX_VALUE;
+ // for (TDataNodeLocation follower :
+ // regionReplicaSetMap.get(regionGroupId).getDataNodeLocations())
{
+ // int followerId = follower.getDataNodeId();
+ // if (followerId != leaderId) {
+ // followerWeight = Math.min(followerWeight,
leaderCounter.get(followerId));
+ // }
+ // }
+ //
+ // weightList.add(new WeightEntry(regionGroupId, leaderWeight,
followerWeight));
+ // }
+ // weightList.sort(WeightEntry.COMPARATOR);
+ //
+ // /* Greedy distribution */
+ // for (WeightEntry weightEntry : weightList) {
+ // TConsensusGroupId regionGroupId = weightEntry.regionGroupId;
+ // int leaderId = regionLeaderMap.get(regionGroupId);
+ // int leaderWeight =
leaderCounter.get(regionLeaderMap.get(regionGroupId));
+ //
+ // int newLeaderId = -1;
+ // int newLeaderWeight = Integer.MAX_VALUE;
+ // for (TDataNodeLocation candidate :
+ // regionReplicaSetMap.get(regionGroupId).getDataNodeLocations())
{
+ // int candidateId = candidate.getDataNodeId();
+ // int candidateWeight = leaderCounter.get(candidateId);
+ // if (!disabledDataNodeSet.contains(candidateId)
+ // && candidateId != leaderId
+ // && candidateWeight < newLeaderWeight) {
+ // newLeaderId = candidateId;
+ // newLeaderWeight = candidateWeight;
+ // }
+ // }
+ //
+ // // Redistribution takes effect only when leaderWeight -
newLeaderWeight > 1.
+ // // i.e. Redistribution can reduce the range of the number of
leaders that each DataNode
+ // owns.
+ // if (leaderWeight - newLeaderWeight > 1) {
+ // leaderCounter.merge(leaderId, -1, Integer::sum);
+ // leaderCounter.merge(newLeaderId, 1, Integer::sum);
+ // regionLeaderMap.replace(regionGroupId, newLeaderId);
+ // }
+ // }
return new ConcurrentHashMap<>(regionLeaderMap);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
index 0067ae19eb1..07fab2d10d6 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java
@@ -30,11 +30,8 @@ public class
RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(RegionAllocatorAndLeaderBalancerCombinatorialManualTest.class);
- private static final IRegionGroupAllocator ALLOCATOR = new
GreedyCopySetRegionGroupAllocator();
- private static final ILeaderBalancer BALANCER = new
MinCostFlowLeaderBalancer();
-
- private static final int TEST_LOOP = 100;
- private static final int TEST_DATA_NODE_NUM = 10;
+ private static final int TEST_LOOP = 10;
+ private static final int TEST_DATA_NODE_NUM = 12;
private static final int DATA_REGION_PER_DATA_NODE = 4;
private static final int DATA_REPLICATION_FACTOR = 3;
private static final String DATABASE = "root.db";
@@ -43,6 +40,11 @@ public class
RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
new TreeMap<>();
private static final Map<Integer, Double> FREE_SPACE_MAP = new TreeMap<>();
+ private static final IRegionGroupAllocator ALLOCATOR = new
GreedyCopySetRegionGroupAllocator();
+ // new TieredReplicationAllocator(
+ // TEST_DATA_NODE_NUM, DATA_REPLICATION_FACTOR,
DATA_REGION_PER_DATA_NODE);
+ private static final ILeaderBalancer BALANCER = new
MinCostFlowLeaderBalancer();
+
@BeforeClass
public static void setUp() {
// Construct TEST_DATA_NODE_NUM DataNodes
@@ -106,25 +108,27 @@ public class
RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
int minScatterWidth = Integer.MAX_VALUE;
int maxScatterWidth = Integer.MIN_VALUE;
for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
- int scatterWidth = scatterWidthMap.get(i).cardinality();
+ int scatterWidth =
+ scatterWidthMap.containsKey(i) ?
scatterWidthMap.get(i).cardinality() : 0;
scatterWidthSum += scatterWidth;
minScatterWidth = Math.min(minScatterWidth, scatterWidth);
maxScatterWidth = Math.max(maxScatterWidth, scatterWidth);
- regionCountList.add(regionCounter.get(i));
+ regionCountList.add(regionCounter.getOrDefault(i, 0));
scatterWidthList.add(scatterWidth);
}
- LOGGER.info(
- "Loop: {}, Test :{}, {}",
- loop,
- ALLOCATOR.getClass().getSimpleName(),
- BALANCER.getClass().getSimpleName());
- LOGGER.info(
- "Allocate {} DataRegionGroups for {} DataNodes", dataRegionGroupNum,
TEST_DATA_NODE_NUM);
- LOGGER.info(
- "Scatter width avg: {}, min: {}, max: {}",
- (double) scatterWidthSum / TEST_DATA_NODE_NUM,
- minScatterWidth,
- maxScatterWidth);
+ // LOGGER.info(
+ // "Loop: {}, Test :{}, {}",
+ // loop,
+ // ALLOCATOR.getClass().getSimpleName(),
+ // BALANCER.getClass().getSimpleName());
+ // LOGGER.info(
+ // "Allocate {} DataRegionGroups for {} DataNodes",
dataRegionGroupNum,
+ // TEST_DATA_NODE_NUM);
+ // LOGGER.info(
+ // "Scatter width avg: {}, min: {}, max: {}",
+ // (double) scatterWidthSum / TEST_DATA_NODE_NUM,
+ // minScatterWidth,
+ // maxScatterWidth);
/* Balance Leader */
Map<String, List<TConsensusGroupId>> databaseRegionGroupMap =
@@ -145,7 +149,7 @@ public class
RegionAllocatorAndLeaderBalancerCombinatorialManualTest {
int minLeaderCount =
leaderCounter.values().stream().min(Integer::compareTo).orElse(0);
int maxLeaderCount =
leaderCounter.values().stream().max(Integer::compareTo).orElse(0);
leaderCounter.forEach((dataNodeId, leaderCount) ->
leaderCountList.add(leaderCount));
- LOGGER.info("Leader count min: {}, max: {}", minLeaderCount,
maxLeaderCount);
+ // LOGGER.info("Leader count min: {}, max: {}", minLeaderCount,
maxLeaderCount);
}
LOGGER.info("All tests done.");