This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch elastic_storage in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56ee80b6657d2fe52d522cad713def133b7c3874 Author: YongzaoDan <[email protected]> AuthorDate: Tue Jul 2 21:41:30 2024 +0800 merge metric --- .../confignode/conf/ConfigNodeDescriptor.java | 3 +- .../confignode/conf/ConfigNodeStartupCheck.java | 3 +- .../manager/load/balancer/RegionBalancer.java | 12 +- .../manager/load/balancer/RouteBalancer.java | 4 + .../region/CopySetRegionGroupAllocator.java | 138 ++++++++++++ .../region/GreedyCopySetRegionGroupAllocator.java | 91 ++++---- .../region/TieredReplicationAllocator.java | 197 +++++++++++++++++ .../router/leader/AbstractLeaderBalancer.java | 1 + .../router/leader/RandomLeaderBalancer.java | 70 ++++++ ...orAndLeaderBalancerCombinatorialManualTest.java | 237 +++++++++++++++++++++ .../region/RegionGroupAllocatorSimulation.java | 209 ++++++++++++++++++ 11 files changed, 924 insertions(+), 41 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 9ca9c26b76c..135f7614829 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -318,7 +318,8 @@ public class ConfigNodeDescriptor { .getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()) .trim(); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) - || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) { + || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy) + || AbstractLeaderBalancer.RANDOM_POLICY.equals(leaderDistributionPolicy)) { conf.setLeaderDistributionPolicy(leaderDistributionPolicy); } else { throw new IOException( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java index ced2964ec5c..0a75680b26c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java @@ -156,7 +156,8 @@ public class ConfigNodeStartupCheck extends StartupChecks { // The leader distribution policy is limited if (!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy()) - && !AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) { + && !AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy()) + && !AbstractLeaderBalancer.RANDOM_POLICY.equals(CONF.getLeaderDistributionPolicy())) { throw new ConfigurationException( "leader_distribution_policy", CONF.getRoutePriorityPolicy(), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java index 9dc3bba9f6e..45d306ebc2b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java @@ -30,9 +30,11 @@ import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.region.TieredReplicationAllocator; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; @@ -57,6 +59,12 @@ public class RegionBalancer { case GREEDY: this.regionGroupAllocator = new GreedyRegionGroupAllocator(); break; + case COPY_SET: + this.regionGroupAllocator = new CopySetRegionGroupAllocator(); + break; + case TIERED_REPLICATION: + this.regionGroupAllocator = new TieredReplicationAllocator(); + break; case GCR: default: this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); @@ -155,6 +163,8 @@ public class RegionBalancer { public enum RegionGroupAllocatePolicy { GREEDY, - GCR + GCR, + COPY_SET, + TIERED_REPLICATION } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 36f85afcbef..2239bfd3211 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.RandomLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer; @@ -125,6 +126,9 @@ public class RouteBalancer implements IClusterStatusSubscriber { case AbstractLeaderBalancer.GREEDY_POLICY: this.leaderBalancer = new GreedyLeaderBalancer(); break; + case AbstractLeaderBalancer.RANDOM_POLICY: + this.leaderBalancer = new RandomLeaderBalancer(); + break; case AbstractLeaderBalancer.CFD_POLICY: default: this.leaderBalancer = new MinCostFlowLeaderBalancer(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java new file mode 100644 index 00000000000..44de1dc1e15 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionGroupAllocator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +public class CopySetRegionGroupAllocator implements IRegionGroupAllocator { + + private final Random RANDOM = new Random(); + private final Map<TConsensusGroupType, Integer> dataNodeNumMap = new TreeMap<>(); + private final Map<TConsensusGroupType, Map<Integer, List<List<Integer>>>> COPY_SETS = + new TreeMap<>(); + + private void init( + int dataNodeNum, + int replicationFactor, + int loadFactor, + TConsensusGroupType consensusGroupType) { + this.dataNodeNumMap.put(consensusGroupType, dataNodeNum); + Map<Integer, List<List<Integer>>> copy_sets = + COPY_SETS.computeIfAbsent(consensusGroupType, k -> new TreeMap<>()); + // sum of COPY_SETS value .size() + int p = copy_sets.values().stream().mapToInt(List::size).sum(); + BitSet bitSet = new BitSet(dataNodeNum + 1); + copy_sets.values().forEach(cps -> cps.forEach(cp -> cp.forEach(bitSet::set))); + while (p < loadFactor || bitSet.cardinality() < dataNodeNum) { + List<Integer> permutation = new ArrayList<>(); + for (int i = 1; i <= dataNodeNum; i++) { + permutation.add(i); + } + for (int i = 1; i < dataNodeNum; i++) { + int pos = RANDOM.nextInt(i); + int tmp = permutation.get(i); + permutation.set(i, permutation.get(pos)); + permutation.set(pos, tmp); + } + for (int i = 0; i + replicationFactor <= permutation.size(); i += replicationFactor) { + p += 1; + List<Integer> copySet = new ArrayList<>(); + for (int j = 0; j < replicationFactor; j++) { + int e = permutation.get(i + j); + copySet.add(e); + bitSet.set(e); + } + for (int c : copySet) { + copy_sets.computeIfAbsent(c, k -> new ArrayList<>()).add(copySet); + } + } + } + } + + @Override + public TRegionReplicaSet generateOptimalRegionReplicasDistribution( + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + Map<Integer, Double> freeDiskSpaceMap, + List<TRegionReplicaSet> allocatedRegionGroups, + List<TRegionReplicaSet> databaseAllocatedRegionGroups, + int replicationFactor, + TConsensusGroupId consensusGroupId) { + if (this.dataNodeNumMap.getOrDefault(consensusGroupId.getType(), -1) + != availableDataNodeMap.size()) { + int regionPerDataNode; + if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) { + regionPerDataNode = + (int) ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode(); + } else { + regionPerDataNode = + (int) ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode(); + } + init( + availableDataNodeMap.size(), + replicationFactor, + regionPerDataNode, + consensusGroupId.getType()); + } + + TRegionReplicaSet result = new TRegionReplicaSet(); + Map<Integer, Integer> regionCounter = new TreeMap<>(); + for (int i = 1; i <= dataNodeNumMap.get(consensusGroupId.getType()); i++) { + regionCounter.put(i, 0); + } + allocatedRegionGroups.forEach( + regionGroup -> + regionGroup + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + regionCounter.merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum))); + int firstRegion = -1, minCount = Integer.MAX_VALUE; + for (Map.Entry<Integer, Integer> counterEntry : regionCounter.entrySet()) { + int dataNodeId = counterEntry.getKey(); + int regionCount = counterEntry.getValue(); + if (regionCount < minCount) { + minCount = regionCount; + firstRegion = dataNodeId; + } else if (regionCount == minCount && RANDOM.nextBoolean()) { + firstRegion = dataNodeId; + } + } + List<Integer> copySet = + COPY_SETS + .get(consensusGroupId.getType()) + .get(firstRegion) + .get(RANDOM.nextInt(COPY_SETS.get(consensusGroupId.getType()).get(firstRegion).size())); + for (int dataNodeId : copySet) { + result.addToDataNodeLocations(availableDataNodeMap.get(dataNodeId).getLocation()); + } + return result.setRegionId(consensusGroupId); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index 2577455d41c..c8a3584f583 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java @@ -20,9 +20,11 @@ package org.apache.iotdb.confignode.manager.load.balancer.region; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import java.util.ArrayList; import java.util.Arrays; @@ -42,6 +44,7 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100; private int replicationFactor; + private int regionPerDataNode; // Sorted available DataNodeIds private int[] dataNodeIds; // The number of allocated Regions in each DataNode @@ -51,14 +54,14 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator // The number of 2-Region combinations in current cluster private int[][] combinationCounter; - // First Key: the sum of Regions at the DataNodes in the allocation result is minimal + // First Key: the sum of overlapped 2-Region combination Regions with + // other allocated RegionGroups is minimal + int optimalCombinationSum; + // Second Key: the sum of Regions at the DataNodes in the allocation result is minimal int optimalRegionSum; - // Second Key: the sum of Regions at the DataNodes within the same Database + // Third Key: the sum of Regions at the DataNodes within the same Database // in the allocation result is minimal int optimalDatabaseRegionSum; - // Third Key: the sum of overlapped 2-Region combination Regions with - // other allocated RegionGroups is minimal - int optimalCombinationSum; List<int[]> optimalReplicaSets; private static class DataNodeEntry { @@ -105,10 +108,11 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator try { prepare( replicationFactor, + consensusGroupId, availableDataNodeMap, allocatedRegionGroups, databaseAllocatedRegionGroups); - dfs(-1, 0, new int[replicationFactor], 0, 0); + dfs(-1, 0, new int[replicationFactor], 0, 0, 0); // Randomly pick one optimal plan as result Collections.shuffle(optimalReplicaSets); @@ -135,11 +139,19 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator */ private void prepare( int replicationFactor, + TConsensusGroupId consensusGroupId, Map<Integer, TDataNodeConfiguration> availableDataNodeMap, List<TRegionReplicaSet> allocatedRegionGroups, List<TRegionReplicaSet> databaseAllocatedRegionGroups) { this.replicationFactor = replicationFactor; + if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) { + this.regionPerDataNode = + (int) ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode(); + } else { + this.regionPerDataNode = + (int) ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode(); + } // Store the maximum DataNodeId int maxDataNodeId = Math.max( @@ -165,9 +177,11 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator regionCounter[dataNodeLocations.get(i).getDataNodeId()]++; for (int j = i + 1; j < dataNodeLocations.size(); j++) { combinationCounter[dataNodeLocations.get(i).getDataNodeId()][ - dataNodeLocations.get(j).getDataNodeId()]++; + dataNodeLocations.get(j).getDataNodeId()] = + 1; combinationCounter[dataNodeLocations.get(j).getDataNodeId()][ - dataNodeLocations.get(i).getDataNodeId()]++; + dataNodeLocations.get(i).getDataNodeId()] = + 1; } } } @@ -207,9 +221,9 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator .toArray(); // Reset the optimal result - optimalDatabaseRegionSum = Integer.MAX_VALUE; - optimalRegionSum = Integer.MAX_VALUE; optimalCombinationSum = Integer.MAX_VALUE; + optimalRegionSum = Integer.MAX_VALUE; + optimalDatabaseRegionSum = Integer.MAX_VALUE; optimalReplicaSets = new ArrayList<>(); } @@ -221,50 +235,41 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator * @param lastIndex last decided index in dataNodeIds * @param currentReplica current replica index * @param currentReplicaSet current allocation plan - * @param databaseRegionSum the sum of Regions at the DataNodes within the same Database in the - * current allocation plan - * @param regionSum the sum of Regions at the DataNodes in the current allocation plan */ private void dfs( int lastIndex, int currentReplica, int[] currentReplicaSet, - int databaseRegionSum, - int regionSum) { - if (regionSum > optimalRegionSum) { + int combinationSum, + int regionSum, + int databaseRegionSum) { + if (combinationSum > optimalCombinationSum) { // Pruning: no needs for further searching when the first key // is bigger than the historical optimal result return; } - if (regionSum == optimalRegionSum && databaseRegionSum > optimalDatabaseRegionSum) { + if (combinationSum == optimalCombinationSum && regionSum > optimalRegionSum) { // Pruning: no needs for further searching when the second key // is bigger than the historical optimal result return; } + if (combinationSum == optimalCombinationSum + && regionSum == optimalRegionSum + && databaseRegionSum > optimalDatabaseRegionSum) { + // Pruning: no needs for further searching when the Third key + // is bigger than the historical optimal result + return; + } if (currentReplica == replicationFactor) { // A complete allocation plan is found - int combinationSum = 0; - for (int i = 0; i < replicationFactor; i++) { - for (int j = i + 1; j < replicationFactor; j++) { - combinationSum += combinationCounter[currentReplicaSet[i]][currentReplicaSet[j]]; - } - } - if (regionSum == optimalRegionSum - && databaseRegionSum == optimalDatabaseRegionSum - && combinationSum > optimalCombinationSum) { - // Pruning: no needs for further searching when the third key - // is bigger than the historical optimal result - return; - } - - if (regionSum < optimalRegionSum - || databaseRegionSum < optimalDatabaseRegionSum - || combinationSum < optimalCombinationSum) { + if (combinationSum < optimalCombinationSum + || regionSum < optimalRegionSum + || databaseRegionSum < optimalDatabaseRegionSum) { // Reset the optimal result when a better one is found - optimalDatabaseRegionSum = databaseRegionSum; - optimalRegionSum = regionSum; optimalCombinationSum = combinationSum; + optimalRegionSum = regionSum; + optimalDatabaseRegionSum = databaseRegionSum; optimalReplicaSets.clear(); } optimalReplicaSets.add(Arrays.copyOf(currentReplicaSet, replicationFactor)); @@ -272,14 +277,24 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator } for (int i = lastIndex + 1; i < dataNodeIds.length; i++) { + if (regionCounter[dataNodeIds[i]] >= regionPerDataNode) { + // Pruning: no needs for further searching when the DataNode + // has already reached the maximum number of Regions + continue; + } // Decide the next DataNodeId in the allocation plan currentReplicaSet[currentReplica] = dataNodeIds[i]; + int nxtCombinationSum = combinationSum; + for (int j = 0; j < currentReplica; j++) { + nxtCombinationSum += combinationCounter[currentReplicaSet[j]][dataNodeIds[i]]; + } dfs( i, currentReplica + 1, currentReplicaSet, - databaseRegionSum + databaseRegionCounter[dataNodeIds[i]], - regionSum + regionCounter[dataNodeIds[i]]); + nxtCombinationSum, + regionSum + regionCounter[dataNodeIds[i]], + databaseRegionSum + databaseRegionCounter[dataNodeIds[i]]); if (optimalReplicaSets.size() == GCR_MAX_OPTIMAL_PLAN_NUM) { // Pruning: no needs for further searching when // the number of optimal plans reaches the limitation diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/TieredReplicationAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/TieredReplicationAllocator.java new file mode 100644 index 00000000000..33538162e3c --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/TieredReplicationAllocator.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TieredReplicationAllocator implements IRegionGroupAllocator { + + private final Random RANDOM = new Random(); + private final Map<TConsensusGroupType, Integer> dataNodeNumMap = new TreeMap<>(); + private final Map<TConsensusGroupType, Map<Integer, List<List<Integer>>>> COPY_SETS = + new TreeMap<>(); + + private static class DataNodeEntry { + + private final int dataNodeId; + private final int scatterWidth; + + public DataNodeEntry(int dataNodeId, int scatterWidth) { + this.dataNodeId = dataNodeId; + this.scatterWidth = scatterWidth; + } + + public int getDataNodeId() { + return dataNodeId; + } + + public int compare(DataNodeEntry other) { + return Integer.compare(scatterWidth, other.scatterWidth); + } + } + + private void init( + int dataNodeNum, + int replicationFactor, + int loadFactor, + TConsensusGroupType consensusGroupType) { + this.dataNodeNumMap.put(consensusGroupType, dataNodeNum); + Map<Integer, List<List<Integer>>> copy_sets = + COPY_SETS.computeIfAbsent(consensusGroupType, k -> new TreeMap<>()); + Map<Integer, BitSet> scatterWidthMap = new TreeMap<>(); + for (int i = 1; i <= dataNodeNum; i++) { + scatterWidthMap.put(i, new BitSet(dataNodeNum + 1)); + } + int targetScatterWidth = Math.min(loadFactor * (replicationFactor - 1), dataNodeNum - 1); + while (existScatterWidthUnsatisfied(scatterWidthMap, targetScatterWidth, consensusGroupType)) { + for (int firstRegion = 1; firstRegion <= dataNodeNum; firstRegion++) { + if (!copy_sets.containsKey(firstRegion) + || scatterWidthMap.get(firstRegion).cardinality() < targetScatterWidth) { + List<Integer> copySet = new ArrayList<>(); + copySet.add(firstRegion); + List<DataNodeEntry> otherDataNodes = new ArrayList<>(); + for (int i = 1; i <= dataNodeNum; i++) { + if (i != firstRegion) { + otherDataNodes.add(new DataNodeEntry(i, scatterWidthMap.get(i).cardinality())); + } + } + otherDataNodes.sort(DataNodeEntry::compare); + for (DataNodeEntry entry : otherDataNodes) { + boolean accepted = true; + int secondRegion = entry.getDataNodeId(); + for (int e : copySet) { + if (scatterWidthMap.get(e).get(secondRegion)) { + accepted = false; + break; + } + } + if (accepted) { + copySet.add(secondRegion); + } + if (copySet.size() == replicationFactor) { + break; + } + } + + while (copySet.size() < replicationFactor) { + int secondRegion = RANDOM.nextInt(dataNodeNum) + 1; + while (copySet.contains(secondRegion)) { + secondRegion = RANDOM.nextInt(dataNodeNum) + 1; + } + copySet.add(secondRegion); + } + + for (int i = 0; i < copySet.size(); i++) { + for (int j = i + 1; j < copySet.size(); j++) { + scatterWidthMap.get(copySet.get(i)).set(copySet.get(j)); + scatterWidthMap.get(copySet.get(j)).set(copySet.get(i)); + } + } + for (int e : copySet) { + copy_sets.computeIfAbsent(e, k -> new ArrayList<>()).add(copySet); + } + break; + } + } + } + } + + private boolean existScatterWidthUnsatisfied( + Map<Integer, BitSet> scatterWidthMap, + int targetScatterWidth, + TConsensusGroupType consensusGroupType) { + for (int i = 1; i <= dataNodeNumMap.get(consensusGroupType); i++) { + if (!COPY_SETS.get(consensusGroupType).containsKey(i)) { + return true; + } + } + AtomicBoolean result = new AtomicBoolean(false); + scatterWidthMap.forEach( + (k, v) -> { + if (v.cardinality() < targetScatterWidth) { + result.set(true); + } + }); + return result.get(); + } + + @Override + public TRegionReplicaSet generateOptimalRegionReplicasDistribution( + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + Map<Integer, Double> freeDiskSpaceMap, + List<TRegionReplicaSet> allocatedRegionGroups, + List<TRegionReplicaSet> databaseAllocatedRegionGroups, + int replicationFactor, + TConsensusGroupId consensusGroupId) { + if (this.dataNodeNumMap.getOrDefault(consensusGroupId.getType(), -1) + != availableDataNodeMap.size()) { + init( + availableDataNodeMap.size(), + replicationFactor, + (int) ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode(), + consensusGroupId.getType()); + } + + TRegionReplicaSet result = new TRegionReplicaSet(); + Map<Integer, Integer> regionCounter = new TreeMap<>(); + for (int i = 1; i <= dataNodeNumMap.get(consensusGroupId.getType()); i++) { + regionCounter.put(i, 0); + } + allocatedRegionGroups.forEach( + regionGroup -> + regionGroup + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + regionCounter.merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum))); + int firstRegion = -1, minCount = Integer.MAX_VALUE; + for (Map.Entry<Integer, Integer> counterEntry : regionCounter.entrySet()) { + int dataNodeId = counterEntry.getKey(); + int regionCount = counterEntry.getValue(); + if (regionCount < minCount) { + minCount = regionCount; + firstRegion = dataNodeId; + } else if (regionCount == minCount && RANDOM.nextBoolean()) { + firstRegion = dataNodeId; + } + } + List<Integer> copySet = + COPY_SETS + .get(consensusGroupId.getType()) + .get(firstRegion) + .get(RANDOM.nextInt(COPY_SETS.get(consensusGroupId.getType()).get(firstRegion).size())); + for (int dataNodeId : copySet) { + result.addToDataNodeLocations(availableDataNodeMap.get(dataNodeId).getLocation()); + } + return result.setRegionId(consensusGroupId); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java index 6ba7a97957d..7b5914de5fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java @@ -40,6 +40,7 @@ public abstract class AbstractLeaderBalancer { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLeaderBalancer.class); public static final String GREEDY_POLICY = "GREEDY"; public static final String CFD_POLICY = "CFD"; + public static final String RANDOM_POLICY = "RANDOM"; // Set<RegionGroupId> protected final Set<TConsensusGroupId> regionGroupIntersection; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java new file mode 100644 index 00000000000..3727df164f4 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/RandomLeaderBalancer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.router.leader; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class RandomLeaderBalancer extends AbstractLeaderBalancer { + + public RandomLeaderBalancer() { + super(); + } + + @Override + public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution( + Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, + Map<TConsensusGroupId, Set<Integer>> regionLocationMap, + Map<TConsensusGroupId, Integer> regionLeaderMap, + Map<Integer, NodeStatistics> dataNodeStatisticsMap, + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { + initialize( + databaseRegionGroupMap, + regionLocationMap, + regionLeaderMap, + dataNodeStatisticsMap, + regionStatisticsMap); + Map<TConsensusGroupId, Integer> result = constructRandomDistribution(); + clear(); + return result; + } + + private Map<TConsensusGroupId, Integer> constructRandomDistribution() { + Random random = new Random(); + regionLocationMap.forEach( + (regionGroupId, regionGroup) -> { + int replicationFactor = regionGroup.size(); + int leaderId = regionGroup.toArray(new Integer[0])[random.nextInt(replicationFactor)]; + while (!dataNodeStatisticsMap.get(leaderId).getStatus().equals(NodeStatus.Running)) { + leaderId = regionGroup.toArray(new Integer[0])[random.nextInt(replicationFactor)]; + } + regionLeaderMap.put(regionGroupId, leaderId); + }); + return new ConcurrentHashMap<>(regionLeaderMap); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java new file mode 100644 index 00000000000..504676de6d3 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/RegionAllocatorAndLeaderBalancerCombinatorialManualTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.RandomLeaderBalancer; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public class RegionAllocatorAndLeaderBalancerCombinatorialManualTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(RegionAllocatorAndLeaderBalancerCombinatorialManualTest.class); + + private static final int TEST_LOOP = 1; + private static final int TEST_DATA_NODE_NUM = 16; + private static final int DATA_REGION_PER_DATA_NODE = 6; + private static final int DATA_REPLICATION_FACTOR = 2; + private static final String DATABASE = "root.db"; + + private static final Map<Integer, TDataNodeConfiguration> AVAILABLE_DATA_NODE_MAP = + new TreeMap<>(); + private static final Map<Integer, Double> FREE_SPACE_MAP = new TreeMap<>(); + private static final Map<Integer, NodeStatistics> DATA_NODE_STATISTICS_MAP = new TreeMap<>(); + + private static final IRegionGroupAllocator ALLOCATOR = new CopySetRegionGroupAllocator(); + private static final AbstractLeaderBalancer BALANCER = new RandomLeaderBalancer(); + + @BeforeClass + public static void setUp() { + // Construct TEST_DATA_NODE_NUM DataNodes + Random random = new Random(); + for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) { + AVAILABLE_DATA_NODE_MAP.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + FREE_SPACE_MAP.put(i, random.nextDouble()); + DATA_NODE_STATISTICS_MAP.put(i, new NodeStatistics(NodeStatus.Running)); + } + } + + @Test + public void manualTest() { + final int dataRegionGroupNum = + DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / DATA_REPLICATION_FACTOR; + List<Integer> regionCountList = new ArrayList<>(); + List<Integer> scatterWidthList = new ArrayList<>(); + List<Integer> leaderCountList = new ArrayList<>(); + for (int loop = 1; loop <= TEST_LOOP; loop++) { + /* Allocate RegionGroup */ + List<TRegionReplicaSet> allocateResult = new ArrayList<>(); + for (int index = 0; index < dataRegionGroupNum; index++) { + allocateResult.add( + ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocateResult, + allocateResult, + DATA_REPLICATION_FACTOR, + new TConsensusGroupId(TConsensusGroupType.DataRegion, index))); + } + + /* Count Region in each DataNode */ + // Map<DataNodeId, RegionGroup Count> + Map<Integer, Integer> regionCounter = new TreeMap<>(); + allocateResult.forEach( + regionReplicaSet -> + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + regionCounter.merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum))); + + /* Calculate scatter width for each DataNode */ + // Map<DataNodeId, ScatterWidth> + // where a true in the bitset denotes the corresponding DataNode can help the DataNode in + // Map-Key to share the RegionGroup-leader and restore data when restarting. + // The more true in the bitset, the more safety the cluster DataNode in Map-Key is. + Map<Integer, BitSet> scatterWidthMap = new TreeMap<>(); + for (TRegionReplicaSet replicaSet : allocateResult) { + for (int i = 0; i < DATA_REPLICATION_FACTOR; i++) { + for (int j = i + 1; j < DATA_REPLICATION_FACTOR; j++) { + int dataNodeId1 = replicaSet.getDataNodeLocations().get(i).getDataNodeId(); + int dataNodeId2 = replicaSet.getDataNodeLocations().get(j).getDataNodeId(); + scatterWidthMap.computeIfAbsent(dataNodeId1, empty -> new BitSet()).set(dataNodeId2); + scatterWidthMap.computeIfAbsent(dataNodeId2, empty -> new BitSet()).set(dataNodeId1); + } + } + } + int scatterWidthSum = 0; + int minScatterWidth = Integer.MAX_VALUE; + int maxScatterWidth = Integer.MIN_VALUE; + for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) { + int scatterWidth = + scatterWidthMap.containsKey(i) ? scatterWidthMap.get(i).cardinality() : 0; + scatterWidthSum += scatterWidth; + minScatterWidth = Math.min(minScatterWidth, scatterWidth); + maxScatterWidth = Math.max(maxScatterWidth, scatterWidth); + regionCountList.add(regionCounter.getOrDefault(i, 0)); + scatterWidthList.add(scatterWidth); + } + // LOGGER.info( + // "Loop: {}, Test :{}, {}", + // loop, + // ALLOCATOR.getClass().getSimpleName(), + // BALANCER.getClass().getSimpleName()); + // LOGGER.info( + // "Allocate {} DataRegionGroups for {} DataNodes", dataRegionGroupNum, + // TEST_DATA_NODE_NUM); + // LOGGER.info( + // "Scatter width avg: {}, min: {}, max: {}", + // (double) scatterWidthSum / TEST_DATA_NODE_NUM, + // minScatterWidth, + // maxScatterWidth); + + /* Balance Leader */ + Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = + Collections.singletonMap( + DATABASE, + allocateResult.stream() + .map(TRegionReplicaSet::getRegionId) + .collect(Collectors.toList())); + Map<TConsensusGroupId, Set<Integer>> regionReplicaSetMap = + allocateResult.stream() + .collect( + Collectors.toMap( + TRegionReplicaSet::getRegionId, + regionReplicaSet -> + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()))); + Map<TConsensusGroupId, Integer> optimalLeaderDistribution = + BALANCER.generateOptimalLeaderDistribution( + databaseRegionGroupMap, + regionReplicaSetMap, + new TreeMap<>(), + DATA_NODE_STATISTICS_MAP, + new TreeMap<>()); + // Map<DataNodeId, Leader Count> + Map<Integer, Integer> leaderCounter = new TreeMap<>(); + optimalLeaderDistribution.forEach( + (regionId, leaderId) -> leaderCounter.merge(leaderId, 1, Integer::sum)); + int minLeaderCount = leaderCounter.values().stream().min(Integer::compareTo).orElse(0); + int maxLeaderCount = leaderCounter.values().stream().max(Integer::compareTo).orElse(0); + leaderCounter.forEach((dataNodeId, leaderCount) -> leaderCountList.add(leaderCount)); + LOGGER.info("Leader count min: {}, max: {}", minLeaderCount, maxLeaderCount); + } + + LOGGER.info("All tests done."); + double regionCountAvg = + regionCountList.stream().mapToInt(Integer::intValue).average().orElse(0); + double regionCountVariance = + regionCountList.stream() + .mapToInt(Integer::intValue) + .mapToDouble(i -> Math.pow(i - regionCountAvg, 2)) + .sum() + / regionCountList.size(); + int regionCountRange = + regionCountList.stream().mapToInt(Integer::intValue).max().orElse(0) + - regionCountList.stream().mapToInt(Integer::intValue).min().orElse(0); + LOGGER.info( + "Region count avg: {}, var: {}, range: {}", + regionCountAvg, + regionCountVariance, + regionCountRange); + double scatterWidthAvg = + scatterWidthList.stream().mapToInt(Integer::intValue).average().orElse(0); + double scatterWidthVariance = + scatterWidthList.stream() + .mapToInt(Integer::intValue) + .mapToDouble(i -> Math.pow(i - scatterWidthAvg, 2)) + .sum() + / scatterWidthList.size(); + int scatterWidthRange = + scatterWidthList.stream().mapToInt(Integer::intValue).max().orElse(0) + - scatterWidthList.stream().mapToInt(Integer::intValue).min().orElse(0); + LOGGER.info( + "Scatter width avg: {}, var: {}, range: {}", + scatterWidthAvg, + scatterWidthVariance, + scatterWidthRange); + double leaderCountAvg = + leaderCountList.stream().mapToInt(Integer::intValue).average().orElse(0); + double leaderCountVariance = + leaderCountList.stream() + .mapToInt(Integer::intValue) + .mapToDouble(i -> Math.pow(i - leaderCountAvg, 2)) + .sum() + / leaderCountList.size(); + int leaderCountRange = + leaderCountList.stream().mapToInt(Integer::intValue).max().orElse(0) + - leaderCountList.stream().mapToInt(Integer::intValue).min().orElse(0); + LOGGER.info( + "Leader count avg: {}, var: {}, range: {}", + leaderCountAvg, + leaderCountVariance, + leaderCountRange); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java new file mode 100644 index 00000000000..c2f75665d4a --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +public class RegionGroupAllocatorSimulation { + private static final Logger LOGGER = + LoggerFactory.getLogger(RegionGroupAllocatorSimulation.class); + + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final int TEST_LOOP = 1; + // private static final double EXAM_LOOP = 100000; + private static final int MIN_DATA_NODE_NUM = 1; + private static final int MAX_DATA_NODE_NUM = 100; + private static final int MIN_DATA_REGION_PER_DATA_NODE = 1; + private static final int MAX_DATA_REGION_PER_DATA_NODE = 10; + private static final int DATA_REPLICATION_FACTOR = 2; + + private static final Map<Integer, TDataNodeConfiguration> AVAILABLE_DATA_NODE_MAP = + new TreeMap<>(); + private static final Map<Integer, Double> FREE_SPACE_MAP = new TreeMap<>(); + + public static class DataEntry { + public final Integer N; + public final Integer W; + public final Integer minScatterWidth; + + // public final List<Double> disabledPercent; + + // private DataEntry( + // int countRange, int minScatterWidth, List<Double> disabledPercent) { + // this.countRange = countRange; + // this.minScatterWidth = minScatterWidth; + // this.disabledPercent = disabledPercent; + // } + private DataEntry(int N, int W, int minScatterWidth) { + this.N = N; + this.W = W; + this.minScatterWidth = minScatterWidth; + } + } + + @Test + public void allocateTest() throws IOException { + List<DataEntry> testResult = new ArrayList<>(); + for (int dataNodeNum = MIN_DATA_NODE_NUM; dataNodeNum <= MAX_DATA_NODE_NUM; dataNodeNum++) { + for (int dataRegionPerDataNode = MIN_DATA_REGION_PER_DATA_NODE; + dataRegionPerDataNode <= Math.min(MAX_DATA_REGION_PER_DATA_NODE, dataNodeNum); + dataRegionPerDataNode++) { + CONF.setDataRegionPerDataNode(dataRegionPerDataNode); + testResult.add(singleTest(dataNodeNum, dataRegionPerDataNode)); + } + // LOGGER.info("{}, finish", dataNodeNum); + } + + // FileWriter scatterW = + // new FileWriter( + // "/Users/yongzaodan/Desktop/simulation/psr-simulate/scatter/r=" + // + DATA_REPLICATION_FACTOR + // + ".log"); + // for (DataEntry entry : testResult) { + // scatterW.write(entry.minScatterWidth + "\n"); + // scatterW.flush(); + // } + // scatterW.close(); + } + + private DataEntry singleTest(int N, int W) { + if (N < DATA_REPLICATION_FACTOR) { + return new DataEntry(N, W, 0); + } + // Construct N DataNodes + Random random = new Random(); + AVAILABLE_DATA_NODE_MAP.clear(); + FREE_SPACE_MAP.clear(); + for (int i = 1; i <= N; i++) { + AVAILABLE_DATA_NODE_MAP.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + FREE_SPACE_MAP.put(i, random.nextDouble()); + } + + boolean passScatter = true; + final int dataRegionGroupNum = W * N / DATA_REPLICATION_FACTOR; + List<Integer> regionCountList = new ArrayList<>(); + List<Integer> scatterWidthList = new ArrayList<>(); + for (int loop = 1; loop <= TEST_LOOP; loop++) { + List<TRegionReplicaSet> allocateResult = new ArrayList<>(); + IRegionGroupAllocator ALLOCATOR = new TieredReplicationAllocator(); + for (int index = 0; index < dataRegionGroupNum; index++) { + allocateResult.add( + ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocateResult, + allocateResult, + DATA_REPLICATION_FACTOR, + new TConsensusGroupId(TConsensusGroupType.DataRegion, index))); + } + + /* Count Region in each DataNode */ + // Map<DataNodeId, RegionGroup Count> + Map<Integer, Integer> regionCounter = new TreeMap<>(); + allocateResult.forEach( + regionReplicaSet -> + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + regionCounter.merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum))); + + /* Calculate scatter width for each DataNode */ + // Map<DataNodeId, ScatterWidth> + Map<Integer, BitSet> scatterWidthMap = new TreeMap<>(); + for (TRegionReplicaSet replicaSet : allocateResult) { + for (int i = 0; i < DATA_REPLICATION_FACTOR; i++) { + int dataNodeId1 = replicaSet.getDataNodeLocations().get(i).getDataNodeId(); + for (int j = i + 1; j < DATA_REPLICATION_FACTOR; j++) { + int dataNodeId2 = replicaSet.getDataNodeLocations().get(j).getDataNodeId(); + scatterWidthMap.computeIfAbsent(dataNodeId1, empty -> new BitSet()).set(dataNodeId2); + scatterWidthMap.computeIfAbsent(dataNodeId2, empty -> new BitSet()).set(dataNodeId1); + } + } + } + + int scatterWidthSum = 0; + // int u = DATA_REPLICATION_FACTOR / 2 + (DATA_REPLICATION_FACTOR % 2 == 0 ? 0 : 1); + int minScatterWidth = Integer.MAX_VALUE; + int u = DATA_REPLICATION_FACTOR / 2; + int maxScatterWidth = Integer.MIN_VALUE; + for (int i = 1; i <= N; i++) { + int scatterWidth = + scatterWidthMap.containsKey(i) ? scatterWidthMap.get(i).cardinality() : 0; + int expScatter = Math.min(Math.max(regionCounter.getOrDefault(i, 0) - 1, 0) * u, N - 1); + if (scatterWidth < expScatter) { + passScatter = false; + } + scatterWidthSum += scatterWidth; + minScatterWidth = Math.min(minScatterWidth, scatterWidth); + maxScatterWidth = Math.max(maxScatterWidth, scatterWidth); + regionCountList.add(regionCounter.getOrDefault(i, 0)); + scatterWidthList.add(scatterWidth); + } + + // for (TRegionReplicaSet regionReplicaSet : allocateResult) { + // LOGGER.info("{}", + // + // regionReplicaSet.getDataNodeLocations().stream().mapToInt(TDataNodeLocation::getDataNodeId).toArray()); + // } + } + + int regionRange = + regionCountList.stream().mapToInt(Integer::intValue).max().orElse(0) + - regionCountList.stream().mapToInt(Integer::intValue).min().orElse(0); + int minScatter = scatterWidthList.stream().mapToInt(Integer::intValue).min().orElse(0); + LOGGER.info( + "Test N={}, W={}, regionRange={} {}, minScatter={} {},", + N, + W, + regionRange, + regionRange <= 1, + minScatter, + passScatter); + // if (!passScatter) { + // System.exit(-1); + // } + // LOGGER.info("Test DataNodeNum={}, RegionPerDataNode={}, minRegion={}, maxRegion={}, + // minScatter={}", N, W, + // regionCountList.stream().mapToInt(Integer::intValue).min().orElse(0), + // regionCountList.stream().mapToInt(Integer::intValue).max().orElse(0), + // minScatter); + return new DataEntry(N, W, minScatter); + } +}
