This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 57aea073d78 Cherry pick Partite Graph Replication #12946
57aea073d78 is described below
commit 57aea073d786587997939db83bf4d6b96a48ac30
Author: Yongzao <[email protected]>
AuthorDate: Wed Jul 17 09:40:07 2024 +0800
Cherry pick Partite Graph Replication #12946
---
.../manager/load/balancer/RegionBalancer.java | 7 +-
...artiteGraphReplicationRegionGroupAllocator.java | 256 +++++++++++++++++++++
2 files changed, 262 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 9dc3bba9f6e..6ba5b34c3dc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
import
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator;
import
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
import
org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
+import
org.apache.iotdb.confignode.manager.load.balancer.region.PartiteGraphReplicationRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
@@ -57,6 +58,9 @@ public class RegionBalancer {
case GREEDY:
this.regionGroupAllocator = new GreedyRegionGroupAllocator();
break;
+ case PGR:
+ this.regionGroupAllocator = new
PartiteGraphReplicationRegionGroupAllocator();
+ break;
case GCR:
default:
this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
@@ -155,6 +159,7 @@ public class RegionBalancer {
public enum RegionGroupAllocatePolicy {
GREEDY,
- GCR
+ GCR,
+ PGR
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
new file mode 100644
index 00000000000..b24acc1bd46
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class PartiteGraphReplicationRegionGroupAllocator implements
IRegionGroupAllocator {
+
+ private static final Random RANDOM = new Random();
+ private static final GreedyRegionGroupAllocator GREEDY_ALLOCATOR =
+ new GreedyRegionGroupAllocator();
+
+ private int subGraphCount;
+ private int replicationFactor;
+ private int regionPerDataNode;
+
+ private int dataNodeNum;
+ // The number of allocated Regions in each DataNode
+ private int[] regionCounter;
+ // The number of edges in current cluster
+ private int[][] combinationCounter;
+ private Map<Integer, Integer> fakeToRealIdMap;
+
+ private int alphaDataNodeNum;
+ // First Key: the sum of overlapped 2-Region combination Regions with
+ // other allocated RegionGroups is minimal
+ private int optimalEdgeSum;
+ // Second Key: the sum of DataRegions in selected DataNodes is minimal
+ private int optimalRegionSum;
+ private int[] optimalAlphaNodes;
+
+ @Override
+ public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId) {
+
+ this.regionPerDataNode =
+ (int)
+ (consensusGroupId.getType().equals(TConsensusGroupType.DataRegion)
+ ?
ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode()
+ :
ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode());
+ prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups);
+
+ // Select a set of optimal alpha nodes
+ for (int i = 0; i < subGraphCount; i++) {
+ subGraphSearch(i, 0, alphaDataNodeNum, 0, 0, new int[alphaDataNodeNum]);
+ }
+ if (optimalEdgeSum == Integer.MAX_VALUE) {
+ return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups,
+ replicationFactor,
+ consensusGroupId);
+ }
+
+ // Select the set of optimal beta nodes
+ List<Integer> partiteNodes = partiteGraphSearch(optimalAlphaNodes[0] %
subGraphCount);
+ if (partiteNodes.size() < replicationFactor - alphaDataNodeNum) {
+ return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups,
+ replicationFactor,
+ consensusGroupId);
+ }
+
+ TRegionReplicaSet result = new TRegionReplicaSet();
+ result.setRegionId(consensusGroupId);
+ for (int i = 0; i < alphaDataNodeNum; i++) {
+ result.addToDataNodeLocations(
+
availableDataNodeMap.get(fakeToRealIdMap.get(optimalAlphaNodes[i])).getLocation());
+ }
+ for (int i = 0; i < replicationFactor - alphaDataNodeNum; i++) {
+ result.addToDataNodeLocations(
+
availableDataNodeMap.get(fakeToRealIdMap.get(partiteNodes.get(i))).getLocation());
+ }
+ return result;
+ }
+
+ private void prepare(
+ int replicationFactor,
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ List<TRegionReplicaSet> allocatedRegionGroups) {
+
+ this.subGraphCount = replicationFactor / 2 + (replicationFactor % 2 == 0 ?
0 : 1);
+ this.replicationFactor = replicationFactor;
+
+ this.fakeToRealIdMap = new TreeMap<>();
+ Map<Integer, Integer> realToFakeIdMap = new TreeMap<>();
+ this.dataNodeNum = availableDataNodeMap.size();
+ List<Integer> dataNodeIdList =
+ availableDataNodeMap.values().stream()
+ .map(c -> c.getLocation().getDataNodeId())
+ .collect(Collectors.toList());
+ for (int i = 0; i < dataNodeNum; i++) {
+ fakeToRealIdMap.put(i, dataNodeIdList.get(i));
+ realToFakeIdMap.put(dataNodeIdList.get(i), i);
+ }
+
+ // Compute regionCounter and combinationCounter
+ this.regionCounter = new int[dataNodeNum];
+ Arrays.fill(regionCounter, 0);
+ this.combinationCounter = new int[dataNodeNum][dataNodeNum];
+ for (int i = 0; i < dataNodeNum; i++) {
+ Arrays.fill(combinationCounter[i], 0);
+ }
+ for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) {
+ List<TDataNodeLocation> dataNodeLocations =
regionReplicaSet.getDataNodeLocations();
+ for (int i = 0; i < dataNodeLocations.size(); i++) {
+ int fakeIId =
realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId());
+ regionCounter[fakeIId]++;
+ for (int j = i + 1; j < dataNodeLocations.size(); j++) {
+ int fakeJId =
realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId());
+ combinationCounter[fakeIId][fakeJId] = 1;
+ combinationCounter[fakeJId][fakeIId] = 1;
+ }
+ }
+ }
+
+ // Reset the optimal result
+ this.alphaDataNodeNum = replicationFactor / 2 + 1;
+ this.optimalEdgeSum = Integer.MAX_VALUE;
+ this.optimalRegionSum = Integer.MAX_VALUE;
+ this.optimalAlphaNodes = new int[alphaDataNodeNum];
+ }
+
+ private void subGraphSearch(
+ int firstIndex,
+ int currentReplica,
+ int replicaNum,
+ int combinationSum,
+ int regionSum,
+ int[] currentReplicaSet) {
+
+ if (currentReplica == replicaNum) {
+ if (combinationSum < optimalEdgeSum
+ || (combinationSum == optimalEdgeSum && regionSum <
optimalRegionSum)) {
+ // Reset the optimal result when a better one is found
+ optimalEdgeSum = combinationSum;
+ optimalRegionSum = regionSum;
+ optimalAlphaNodes = Arrays.copyOf(currentReplicaSet,
replicationFactor);
+ } else if (combinationSum == optimalEdgeSum
+ && regionSum == optimalRegionSum
+ && RANDOM.nextBoolean()) {
+ optimalAlphaNodes = Arrays.copyOf(currentReplicaSet,
replicationFactor);
+ }
+ return;
+ }
+
+ for (int i = firstIndex; i < dataNodeNum; i += subGraphCount) {
+ if (regionCounter[i] >= regionPerDataNode) {
+ // Pruning: skip full DataNodes
+ continue;
+ }
+ int nxtCombinationSum = combinationSum;
+ for (int j = 0; j < currentReplica; j++) {
+ nxtCombinationSum += combinationCounter[i][currentReplicaSet[j]];
+ }
+ if (combinationSum > optimalEdgeSum) {
+ // Pruning: no needs for further searching when the first key
+ // is bigger than the historical optimal result
+ return;
+ }
+ int nxtRegionSum = regionSum + regionCounter[i];
+ if (combinationSum == optimalEdgeSum && regionSum > optimalRegionSum) {
+ // Pruning: no needs for further searching when the second key
+ // is bigger than the historical optimal result
+ return;
+ }
+ currentReplicaSet[currentReplica] = i;
+ subGraphSearch(
+ i + subGraphCount,
+ currentReplica + 1,
+ replicaNum,
+ nxtCombinationSum,
+ nxtRegionSum,
+ currentReplicaSet);
+ }
+ }
+
+ private List<Integer> partiteGraphSearch(int selected) {
+ List<Integer> partiteNodes = new ArrayList<>();
+ for (int partiteIndex = 0; partiteIndex < subGraphCount; partiteIndex++) {
+ if (partiteIndex == selected) {
+ continue;
+ }
+ int selectedDataNode = -1;
+ int bestScatterWidth = 0;
+ int bestRegionSum = Integer.MAX_VALUE;
+ for (int i = partiteIndex; i < dataNodeNum; i += subGraphCount) {
+ if (regionCounter[i] >= regionPerDataNode) {
+ continue;
+ }
+ int scatterWidth = alphaDataNodeNum;
+ for (int k = 0; k < alphaDataNodeNum; k++) {
+ scatterWidth -= combinationCounter[i][optimalAlphaNodes[k]];
+ }
+ if (scatterWidth < bestScatterWidth) {
+ continue;
+ }
+ if (scatterWidth > bestScatterWidth) {
+ bestScatterWidth = scatterWidth;
+ bestRegionSum = regionCounter[i];
+ selectedDataNode = i;
+ } else if (regionCounter[i] < bestRegionSum) {
+ bestRegionSum = regionCounter[i];
+ selectedDataNode = i;
+ } else if (regionCounter[i] == bestRegionSum && RANDOM.nextBoolean()) {
+ selectedDataNode = i;
+ }
+ }
+ if (selectedDataNode == -1) {
+ return new ArrayList<>();
+ }
+ partiteNodes.add(selectedDataNode);
+ }
+ return partiteNodes;
+ }
+}