This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch greedy-copy-set in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e6e97056d3445ba7a86dfb90389f8e5ebf443ca Author: YongzaoDan <[email protected]> AuthorDate: Sun Nov 19 12:07:55 2023 +0800 Finish --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +- .../manager/load/balancer/RegionBalancer.java | 9 +- .../region/GreedyCopySetRegionGroupAllocator.java | 183 ++++++++++++++++++ .../GreedyCopySetRegionGroupAllocatorTest.java | 209 +++++++++++++++++++++ 4 files changed, 400 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 277a3513f3c..c755eb010cc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -108,7 +108,7 @@ public class ConfigNodeConfig { /** RegionGroup allocate policy. */ private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy = - RegionBalancer.RegionGroupAllocatePolicy.GREEDY; + RegionBalancer.RegionGroupAllocatePolicy.GREEDY_COPY_SET; /** Max concurrent client number. */ private int rpcMaxConcurrentClientNum = 65535; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java index dc2c70fc003..33ecb70c3bb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java @@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; import org.apache.iotdb.confignode.manager.node.NodeManager; @@ -56,8 +57,11 @@ public class RegionBalancer { this.regionGroupAllocator = new CopySetRegionGroupAllocator(); break; case GREEDY: - default: this.regionGroupAllocator = new GreedyRegionGroupAllocator(); + break; + case GREEDY_COPY_SET: + default: + this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); } } @@ -147,6 +151,7 @@ public class RegionBalancer { public enum RegionGroupAllocatePolicy { COPY_SET, - GREEDY + GREEDY, + GREEDY_COPY_SET } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java new file mode 100644 index 00000000000..77e4e2965e0 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** Allocate Region through Greedy and CopySet Algorithm */ +public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator { + + int replicationFactor; + // RegionGroup allocation BitSet + private List<BitSet> allocatedBitSets; + // Map<DataNodeId, RegionGroup count> + private Map<Integer, AtomicInteger> regionCounter; + // Available DataNodeIds + private Integer[] dataNodeIds; + + // First Key: the sum of Regions at the DataNodes in the allocation result is minimal + int optimalRegionSum; + // Second Key: the sum of intersected Regions with other allocated RegionGroups is minimal + int optimalIntersectionSum; + List<Integer[]> optimalReplicaSets; + + public GreedyCopySetRegionGroupAllocator() { + // Empty constructor + } + + @Override + public TRegionReplicaSet generateOptimalRegionReplicasDistribution( + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + Map<Integer, Double> freeDiskSpaceMap, + List<TRegionReplicaSet> allocatedRegionGroups, + int replicationFactor, + TConsensusGroupId consensusGroupId) { + + prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups); + dfs(-1, 0, new Integer[replicationFactor], 0, 0); + + // Randomly pick one optimal plan as result + Collections.shuffle(optimalReplicaSets); + Integer[] optimalReplicaSet = optimalReplicaSets.get(0); + TRegionReplicaSet result = new TRegionReplicaSet(); + result.setRegionId(consensusGroupId); + for (int i = 0; i < replicationFactor; i++) { + result.addToDataNodeLocations(availableDataNodeMap.get(optimalReplicaSet[i]).getLocation()); + } + return result; + } + + /** + * Prepare some statistics before dfs + * + * @param replicationFactor replication factor in the cluster + * @param availableDataNodeMap currently available DataNodes, ensure size() >= replicationFactor + * @param allocatedRegionGroups already allocated RegionGroups in the cluster + */ + private void prepare( + int replicationFactor, + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + List<TRegionReplicaSet> allocatedRegionGroups) { + + this.replicationFactor = replicationFactor; + int maxDataNodeId = availableDataNodeMap.keySet().stream().max(Integer::compareTo).orElse(0); + for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) { + for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { + // Store the maximum DataNodeId in this algorithm loop + maxDataNodeId = Math.max(maxDataNodeId, dataNodeLocation.getDataNodeId()); + } + } + + // Convert the allocatedRegionGroups into allocatedBitSets, + // where a true in BitSet corresponding to a DataNodeId in the RegionGroup + allocatedBitSets = new ArrayList<>(); + for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) { + BitSet bitSet = new BitSet(maxDataNodeId + 1); + for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { + bitSet.set(dataNodeLocation.getDataNodeId()); + } + allocatedBitSets.add(bitSet); + } + + // Count the number of Regions in each DataNode + regionCounter = new HashMap<>(); + for (int i = 0; i <= maxDataNodeId; i++) { + regionCounter.put(i, new AtomicInteger(0)); + } + allocatedRegionGroups.forEach( + regionGroup -> + regionGroup + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + regionCounter.get(dataNodeLocation.getDataNodeId()).incrementAndGet())); + + // Reset the optimal result + dataNodeIds = new Integer[availableDataNodeMap.size()]; + availableDataNodeMap.keySet().toArray(dataNodeIds); + optimalRegionSum = Integer.MAX_VALUE; + optimalIntersectionSum = Integer.MAX_VALUE; + optimalReplicaSets = new ArrayList<>(); + } + + /** + * Dfs each possible allocation plan, and keep those with the highest priority: First Key: the sum + * of Regions at the DataNodes in the allocation result is minimal, Second Key: the sum of + * intersected Regions with other allocated RegionGroups is minimal + * + * @param lastIndex last decided index in dataNodeIds + * @param currentReplica current replica index + * @param currentReplicaSet current allocation plan + * @param regionSum the sum of Regions at the DataNodes in the current allocation plan + * @param intersectionSum the sum of intersected Regions with other allocated RegionGroups in + * current allocation plan + */ + private void dfs( + int lastIndex, + int currentReplica, + Integer[] currentReplicaSet, + int regionSum, + int intersectionSum) { + if (regionSum > optimalRegionSum || intersectionSum > optimalRegionSum) { + // Pruning: no needs for further searching when either the first key or the second key + // is bigger than the historical optimal result + return; + } + + if (currentReplica == replicationFactor) { + // A complete allocation plan is found + if (regionSum < optimalRegionSum || intersectionSum < optimalRegionSum) { + // Reset the optimal result when a better one is found + optimalRegionSum = regionSum; + optimalIntersectionSum = intersectionSum; + optimalReplicaSets.clear(); + } + optimalReplicaSets.add(Arrays.copyOf(currentReplicaSet, replicationFactor)); + return; + } + + for (int i = lastIndex + 1; i < dataNodeIds.length; i++) { + // Decide the next DataNodeId in the allocation plan + currentReplicaSet[currentReplica] = dataNodeIds[i]; + int intersectionDelta = 0; + for (BitSet bitSet : allocatedBitSets) { + intersectionDelta += bitSet.get(dataNodeIds[i]) ? 1 : 0; + } + dfs( + i, + currentReplica + 1, + currentReplicaSet, + regionSum + regionCounter.get(dataNodeIds[i]).get(), + intersectionSum + intersectionDelta); + } + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java new file mode 100644 index 00000000000..c161eb5f241 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +public class GreedyCopySetRegionGroupAllocatorTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(GreedyCopySetRegionGroupAllocatorTest.class); + + private static final GreedyRegionGroupAllocator GREEDY_ALLOCATOR = + new GreedyRegionGroupAllocator(); + private static final GreedyCopySetRegionGroupAllocator GREEDY_COPY_SET_ALLOCATOR = + new GreedyCopySetRegionGroupAllocator(); + + private static final int TEST_DATA_NODE_NUM = 21; + private static final int DATA_REGION_PER_DATA_NODE = + (int) ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode(); + private static final Map<Integer, TDataNodeConfiguration> AVAILABLE_DATA_NODE_MAP = + new HashMap<>(); + private static final Map<Integer, Double> FREE_SPACE_MAP = new HashMap<>(); + + @BeforeClass + public static void setUp() { + // Construct 21 DataNodes + Random random = new Random(); + for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) { + AVAILABLE_DATA_NODE_MAP.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + FREE_SPACE_MAP.put(i, random.nextDouble()); + } + } + + @Test + public void test2Factor() { + testRegionDistributionAndScatterWidth(2); + } + + @Test + public void test3Factor() { + testRegionDistributionAndScatterWidth(3); + } + + private void testRegionDistributionAndScatterWidth(int replicationFactor) { + final int dataRegionGroupNum = + DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / replicationFactor; + + /* Allocate DataRegionGroups */ + List<TRegionReplicaSet> greedyResult = new ArrayList<>(); + List<TRegionReplicaSet> greedyCopySetResult = new ArrayList<>(); + for (int index = 0; index < dataRegionGroupNum; index++) { + greedyResult.add( + GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + greedyResult, + replicationFactor, + new TConsensusGroupId(TConsensusGroupType.DataRegion, index))); + greedyCopySetResult.add( + GREEDY_COPY_SET_ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + greedyCopySetResult, + replicationFactor, + new TConsensusGroupId(TConsensusGroupType.DataRegion, index))); + } + + /* Statistics result */ + // Map<DataNodeId, RegionGroup Count> for greedy algorithm + Map<Integer, AtomicInteger> greedyRegionCounter = new HashMap<>(); + greedyResult.forEach( + regionReplicaSet -> + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> { + greedyRegionCounter + .computeIfAbsent( + dataNodeLocation.getDataNodeId(), empty -> new AtomicInteger(0)) + .incrementAndGet(); + })); + // Map<DataNodeId, ScatterWidth> for greedy algorithm + // where a true in the bitset denotes the corresponding DataNode can help the DataNode in + // Map-Key to + // share the RegionGroup-leader and restore data when restarting. The more true in the bitset, + // the + // more safety the cluster DataNode in Map-Key is. + Map<Integer, BitSet> greedyScatterWidth = new HashMap<>(); + for (TRegionReplicaSet replicaSet : greedyResult) { + for (int i = 0; i < replicationFactor; i++) { + for (int j = i + 1; j < replicationFactor; j++) { + int dataNodeId1 = replicaSet.getDataNodeLocations().get(i).getDataNodeId(); + int dataNodeId2 = replicaSet.getDataNodeLocations().get(j).getDataNodeId(); + greedyScatterWidth.computeIfAbsent(dataNodeId1, empty -> new BitSet()).set(dataNodeId2); + greedyScatterWidth.computeIfAbsent(dataNodeId2, empty -> new BitSet()).set(dataNodeId1); + } + } + } + + // Map<DataNodeId, RegionGroup Count> for greedy-copy-set algorithm + Map<Integer, AtomicInteger> greedyCopySetRegionCounter = new HashMap<>(); + greedyCopySetResult.forEach( + regionReplicaSet -> + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + greedyCopySetRegionCounter + .computeIfAbsent( + dataNodeLocation.getDataNodeId(), empty -> new AtomicInteger(0)) + .incrementAndGet())); + // Map<DataNodeId, ScatterWidth> for greedy-copy-set algorithm, ditto + Map<Integer, BitSet> greedyCopySetScatterWidth = new HashMap<>(); + for (TRegionReplicaSet replicaSet : greedyCopySetResult) { + for (int i = 0; i < replicationFactor; i++) { + for (int j = i + 1; j < replicationFactor; j++) { + int dataNodeId1 = replicaSet.getDataNodeLocations().get(i).getDataNodeId(); + int dataNodeId2 = replicaSet.getDataNodeLocations().get(j).getDataNodeId(); + greedyCopySetScatterWidth + .computeIfAbsent(dataNodeId1, empty -> new BitSet()) + .set(dataNodeId2); + greedyCopySetScatterWidth + .computeIfAbsent(dataNodeId2, empty -> new BitSet()) + .set(dataNodeId1); + } + } + } + + /* Check result */ + int avgDataRegionNum = dataRegionGroupNum * replicationFactor; + avgDataRegionNum = + avgDataRegionNum % replicationFactor == 0 + ? avgDataRegionNum / replicationFactor + : avgDataRegionNum / replicationFactor + 1; + int greedyScatterWidthSum = 0; + int greedyMinScatterWidth = Integer.MAX_VALUE; + int greedyMaxScatterWidth = Integer.MIN_VALUE; + int greedyCopySetScatterWidthSum = 0; + int greedyCopySetMinScatterWidth = Integer.MAX_VALUE; + int greedyCopySetMaxScatterWidth = Integer.MIN_VALUE; + for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) { + Assert.assertTrue(greedyRegionCounter.get(i).get() <= avgDataRegionNum); + Assert.assertTrue(greedyCopySetRegionCounter.get(i).get() <= avgDataRegionNum); + + int scatterWidth = greedyScatterWidth.get(i).cardinality(); + greedyScatterWidthSum += scatterWidth; + greedyMinScatterWidth = Math.min(greedyMinScatterWidth, scatterWidth); + greedyMaxScatterWidth = Math.max(greedyMaxScatterWidth, scatterWidth); + + scatterWidth = greedyCopySetScatterWidth.get(i).cardinality(); + greedyCopySetScatterWidthSum += scatterWidth; + greedyCopySetMinScatterWidth = Math.min(greedyCopySetMinScatterWidth, scatterWidth); + greedyCopySetMaxScatterWidth = Math.max(greedyCopySetMaxScatterWidth, scatterWidth); + } + + LOGGER.info( + "replicationFactor: {}, Scatter width for greedy: avg={}, min={}, max={}", + replicationFactor, + (double) greedyScatterWidthSum / TEST_DATA_NODE_NUM, + greedyMinScatterWidth, + greedyMaxScatterWidth); + LOGGER.info( + "replicationFactor: {}, Scatter width for greedyCopySet: avg={}, min={}, max={}", + replicationFactor, + (double) greedyCopySetScatterWidthSum / TEST_DATA_NODE_NUM, + greedyCopySetMinScatterWidth, + greedyCopySetMaxScatterWidth); + Assert.assertTrue(greedyCopySetScatterWidthSum >= greedyScatterWidthSum); + Assert.assertTrue(greedyCopySetMaxScatterWidth >= greedyMaxScatterWidth); + Assert.assertTrue(greedyCopySetMinScatterWidth >= greedyMinScatterWidth); + } +}
