This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch elastic_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/elastic_storage by this push:
new d0092419ccd finish
d0092419ccd is described below
commit d0092419ccdc5fb5aae44d47626bf13ab7522dbb
Author: YongzaoDan <[email protected]>
AuthorDate: Tue Jul 16 18:51:29 2024 +0800
finish
---
.../manager/load/balancer/RouteBalancer.java | 10 +-
.../region/GreedyCopySetRegionGroupAllocator.java | 11 +
.../manager/load/balancer/region/PGRA.java | 285 +++++++++++++++++++++
.../region/PartiteGraphRegionGroupAllocator.java | 255 ++++++++++++++++++
.../router/leader/AbstractLeaderBalancer.java | 45 ++++
.../load/cache/region/RegionGroupCache.java | 2 +-
.../region/RegionGroupAllocatorSimulation.java | 61 +++--
.../analyze/cache/partition/PartitionCache.java | 9 +
8 files changed, 649 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 2239bfd3211..05803a42968 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -316,6 +316,14 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
CnToDnRequestType.UPDATE_REGION_ROUTE_MAP,
new TRegionRouteReq(broadcastTime, tmpPriorityMap),
dataNodeLocationMap);
+ for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> routeEntry :
tmpPriorityMap.entrySet()) {
+ LOGGER.info(
+ "[RouteMap] {}: {}",
+ routeEntry.getKey(),
+ routeEntry.getValue().getDataNodeLocations().stream()
+ .mapToInt(TDataNodeLocation::getDataNodeId)
+ .toArray());
+ }
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
}
@@ -437,7 +445,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
@Override
public void
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
- balanceRegionLeader();
+ // balanceRegionLeader();
balanceRegionPriority();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index c8a3584f583..4446025f2fc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -114,6 +114,17 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
databaseAllocatedRegionGroups);
dfs(-1, 0, new int[replicationFactor], 0, 0, 0);
+ if (optimalReplicaSets.isEmpty()) {
+ GreedyRegionGroupAllocator tmpAllocator = new
GreedyRegionGroupAllocator();
+ return tmpAllocator.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups,
+ replicationFactor,
+ consensusGroupId);
+ }
+
// Randomly pick one optimal plan as result
Collections.shuffle(optimalReplicaSets);
int[] optimalReplicaSet = optimalReplicaSets.get(0);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PGRA.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PGRA.java
new file mode 100644
index 00000000000..1339ca20be8
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PGRA.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class PGRA implements IRegionGroupAllocator {
+
+ private static class DataNodeEntry {
+
+ private final int fakeId;
+ private final int regionCount;
+ private final int scatterWidth;
+ private final int randomWeight;
+
+ public DataNodeEntry(int fakeId, int regionCount, int scatterWidth, int
randomWeight) {
+ this.fakeId = fakeId;
+ this.regionCount = regionCount;
+ this.scatterWidth = scatterWidth;
+ this.randomWeight = randomWeight;
+ }
+
+ public int compare(PGRA.DataNodeEntry e) {
+ return regionCount != e.regionCount
+ ? Integer.compare(regionCount, e.regionCount)
+ : scatterWidth != e.scatterWidth
+ ? Integer.compare(scatterWidth, e.scatterWidth)
+ : Integer.compare(randomWeight, e.randomWeight);
+ }
+ }
+
+ private static final Random RANDOM = new Random();
+
+ private int subGraphCount;
+ private int replicationFactor;
+ private int regionPerDataNode;
+
+ private int dataNodeNum;
+ // The number of allocated Regions in each DataNode
+ private int[] regionCounter;
+ // The scatter width of each DataNode
+ private int[] scatterWidthCounter;
+ // The number of 2-Region combinations in current cluster
+ private int[][] combinationCounter;
+ private Map<Integer, Integer> fakeToRealIdMap;
+ private Map<Integer, Integer> realToFakeIdMap;
+
+ private int subDataNodeNum;
+ // First Key: the sum of overlapped 2-Region combination Regions with
+ // other allocated RegionGroups is minimal
+ private int optimalCombinationSum;
+ // Second Key: the sum of DataRegions in selected DataNodes is minimal
+ private int optimalRegionSum;
+ private int[] optimalSubDataNodes;
+
+ @Override
+ public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId) {
+
+ this.regionPerDataNode =
+ (int)
+ (consensusGroupId.getType().equals(TConsensusGroupType.DataRegion)
+ ?
ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode()
+ :
ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode());
+ prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups);
+ for (int i = 0; i < subGraphCount; i++) {
+ subGraphSearch(i);
+ }
+ if (optimalCombinationSum == Integer.MAX_VALUE) {
+ return new GreedyRegionGroupAllocator()
+ .generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups,
+ replicationFactor,
+ consensusGroupId);
+ }
+ List<Integer> partiteNodes = partiteGraphSearch(optimalSubDataNodes[0] %
subGraphCount);
+ if (partiteNodes.size() < replicationFactor - subDataNodeNum) {
+ return new GreedyRegionGroupAllocator()
+ .generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups,
+ replicationFactor,
+ consensusGroupId);
+ }
+
+ TRegionReplicaSet result = new TRegionReplicaSet();
+ result.setRegionId(consensusGroupId);
+ for (int i = 0; i < subDataNodeNum; i++) {
+ result.addToDataNodeLocations(
+
availableDataNodeMap.get(fakeToRealIdMap.get(optimalSubDataNodes[i])).getLocation());
+ }
+ for (int i = 0; i < replicationFactor - subDataNodeNum; i++) {
+ result.addToDataNodeLocations(
+
availableDataNodeMap.get(fakeToRealIdMap.get(partiteNodes.get(i))).getLocation());
+ }
+ return result;
+ }
+
+ private void prepare(
+ int replicationFactor,
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ List<TRegionReplicaSet> allocatedRegionGroups) {
+
+ this.subGraphCount = replicationFactor / 2 + (replicationFactor % 2 == 0 ?
0 : 1);
+ this.replicationFactor = replicationFactor;
+
+ this.fakeToRealIdMap = new TreeMap<>();
+ this.realToFakeIdMap = new TreeMap<>();
+ this.dataNodeNum = availableDataNodeMap.size();
+ List<Integer> dataNodeIdList =
+ availableDataNodeMap.values().stream()
+ .map(c -> c.getLocation().getDataNodeId())
+ .collect(Collectors.toList());
+ for (int i = 0; i < dataNodeNum; i++) {
+ fakeToRealIdMap.put(i, dataNodeIdList.get(i));
+ realToFakeIdMap.put(dataNodeIdList.get(i), i);
+ }
+
+ // Compute regionCounter, combinationCounter and scatterWidthCounter
+ this.regionCounter = new int[dataNodeNum];
+ Arrays.fill(regionCounter, 0);
+ this.combinationCounter = new int[dataNodeNum][dataNodeNum];
+ for (int i = 0; i < dataNodeNum; i++) {
+ Arrays.fill(combinationCounter[i], 0);
+ }
+ for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) {
+ List<TDataNodeLocation> dataNodeLocations =
regionReplicaSet.getDataNodeLocations();
+ for (int i = 0; i < dataNodeLocations.size(); i++) {
+ int fakeIId =
realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId());
+ regionCounter[fakeIId]++;
+ for (int j = i + 1; j < dataNodeLocations.size(); j++) {
+ int fakeJId =
realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId());
+ combinationCounter[fakeIId][fakeJId] = 1;
+ combinationCounter[fakeJId][fakeIId] = 1;
+ }
+ }
+ }
+ this.scatterWidthCounter = new int[dataNodeNum];
+ Arrays.fill(scatterWidthCounter, 0);
+ for (int i = 0; i < dataNodeNum; i++) {
+ for (int j = 0; j < dataNodeNum; j++) {
+ scatterWidthCounter[i] += combinationCounter[i][j];
+ }
+ }
+
+ // Reset the optimal result
+ this.subDataNodeNum = replicationFactor / 2 + 1;
+ this.optimalCombinationSum = Integer.MAX_VALUE;
+ this.optimalRegionSum = Integer.MAX_VALUE;
+ this.optimalSubDataNodes = new int[subDataNodeNum];
+ }
+
+ private void subGraphSearch(int firstIndex) {
+ List<DataNodeEntry> entryList = new ArrayList<>();
+ for (int i = firstIndex; i < dataNodeNum; i += subGraphCount) {
+ if (regionCounter[i] >= regionPerDataNode) {
+ continue;
+ }
+ entryList.add(
+ new DataNodeEntry(i, regionCounter[i], scatterWidthCounter[i],
RANDOM.nextInt()));
+ }
+ if (entryList.size() < subDataNodeNum) {
+ return;
+ }
+ entryList.sort(DataNodeEntry::compare);
+ int[] subDataNodes = new int[subDataNodeNum];
+ // Pick replicationFactor / 2 DataNodes with the smallest regionCount first
+ for (int i = 0; i < subDataNodeNum - 1; i++) {
+ subDataNodes[i] = entryList.get(i).fakeId;
+ }
+ int curCombinationSum = Integer.MAX_VALUE;
+ int curRegionSum = Integer.MAX_VALUE;
+ // Select the last DataNode
+ for (int i = subDataNodeNum - 1; i < entryList.size(); i++) {
+ int tmpCombinationSum = 0;
+ for (int j = 0; j < subDataNodeNum - 1; j++) {
+ tmpCombinationSum +=
combinationCounter[subDataNodes[j]][entryList.get(i).fakeId];
+ }
+ if (tmpCombinationSum < curCombinationSum) {
+ curCombinationSum = tmpCombinationSum;
+ curRegionSum = entryList.get(i).regionCount;
+ subDataNodes[subDataNodeNum - 1] = entryList.get(i).fakeId;
+ } else if (tmpCombinationSum == curCombinationSum
+ && entryList.get(i).regionCount < curRegionSum) {
+ curRegionSum = entryList.get(i).regionCount;
+ subDataNodes[subDataNodeNum - 1] = entryList.get(i).fakeId;
+ }
+ }
+ for (int i = 0; i < subDataNodeNum - 1; i++) {
+ curRegionSum += regionCounter[subDataNodes[i]];
+ for (int j = i + 1; j < subDataNodeNum - 1; j++) {
+ curCombinationSum +=
combinationCounter[subDataNodes[i]][subDataNodes[j]];
+ }
+ }
+ if (curCombinationSum < optimalCombinationSum
+ || (curCombinationSum == optimalCombinationSum && curRegionSum <
optimalRegionSum)) {
+ optimalCombinationSum = curCombinationSum;
+ optimalRegionSum = curRegionSum;
+ optimalSubDataNodes = Arrays.copyOf(subDataNodes, subDataNodeNum);
+ } else if (curCombinationSum == optimalCombinationSum
+ && curRegionSum == optimalRegionSum
+ && RANDOM.nextBoolean()) {
+ optimalSubDataNodes = Arrays.copyOf(subDataNodes, subDataNodeNum);
+ }
+ }
+
+ private List<Integer> partiteGraphSearch(int selected) {
+ List<Integer> partiteNodes = new ArrayList<>();
+ for (int partiteIndex = 0; partiteIndex < subGraphCount; partiteIndex++) {
+ if (partiteIndex == selected) {
+ continue;
+ }
+ int selectedDataNode = -1;
+ int bestScatterWidth = 0;
+ int bestRegionSum = Integer.MAX_VALUE;
+ for (int i = partiteIndex; i < dataNodeNum; i += subGraphCount) {
+ if (regionCounter[i] >= regionPerDataNode) {
+ continue;
+ }
+ int scatterWidth = subDataNodeNum;
+ for (int k = 0; k < subDataNodeNum; k++) {
+ scatterWidth -= combinationCounter[i][optimalSubDataNodes[k]];
+ }
+ if (scatterWidth < bestScatterWidth) {
+ continue;
+ }
+ if (scatterWidth > bestScatterWidth) {
+ bestScatterWidth = scatterWidth;
+ bestRegionSum = regionCounter[i];
+ selectedDataNode = i;
+ } else if (regionCounter[i] < bestRegionSum) {
+ bestRegionSum = regionCounter[i];
+ selectedDataNode = i;
+ } else if (regionCounter[i] == bestRegionSum && RANDOM.nextBoolean()) {
+ selectedDataNode = i;
+ }
+ }
+ if (selectedDataNode == -1) {
+ return new ArrayList<>();
+ }
+ partiteNodes.add(selectedDataNode);
+ }
+ return partiteNodes;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphRegionGroupAllocator.java
new file mode 100644
index 00000000000..6d743f37d4e
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphRegionGroupAllocator.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class PartiteGraphRegionGroupAllocator implements IRegionGroupAllocator
{
+
+ private static final Random RANDOM = new Random();
+ private static final GreedyRegionGroupAllocator GREEDY_ALLOCATOR =
+ new GreedyRegionGroupAllocator();
+
+ private int subGraphCount;
+ private int replicationFactor;
+ private int regionPerDataNode;
+
+ private int dataNodeNum;
+ // The number of allocated Regions in each DataNode
+ private int[] regionCounter;
+ // The number of 2-Region combinations in current cluster
+ private int[][] combinationCounter;
+ private Map<Integer, Integer> fakeToRealIdMap;
+ private Map<Integer, Integer> realToFakeIdMap;
+
+ private int subDataNodeNum;
+ // First Key: the sum of overlapped 2-Region combination Regions with
+ // other allocated RegionGroups is minimal
+ private int optimalCombinationSum;
+ // Second Key: the sum of DataRegions in selected DataNodes is minimal
+ private int optimalRegionSum;
+ private int[] optimalSubDataNodes;
+
+ @Override
+ public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups,
+ int replicationFactor,
+ TConsensusGroupId consensusGroupId) {
+
+ this.regionPerDataNode =
+ (int)
+ (consensusGroupId.getType().equals(TConsensusGroupType.DataRegion)
+ ?
ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode()
+ :
ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode());
+ prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups);
+
+ for (int i = 0; i < subGraphCount; i++) {
+ subGraphSearch(i, 0, subDataNodeNum, 0, 0, new int[subDataNodeNum]);
+ }
+ if (optimalCombinationSum == Integer.MAX_VALUE) {
+ return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups,
+ replicationFactor,
+ consensusGroupId);
+ }
+
+ List<Integer> partiteNodes = partiteGraphSearch(optimalSubDataNodes[0] %
subGraphCount);
+ if (partiteNodes.size() < replicationFactor - subDataNodeNum) {
+ return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups,
+ replicationFactor,
+ consensusGroupId);
+ }
+
+ TRegionReplicaSet result = new TRegionReplicaSet();
+ result.setRegionId(consensusGroupId);
+ for (int i = 0; i < subDataNodeNum; i++) {
+ result.addToDataNodeLocations(
+
availableDataNodeMap.get(fakeToRealIdMap.get(optimalSubDataNodes[i])).getLocation());
+ }
+ for (int i = 0; i < replicationFactor - subDataNodeNum; i++) {
+ result.addToDataNodeLocations(
+
availableDataNodeMap.get(fakeToRealIdMap.get(partiteNodes.get(i))).getLocation());
+ }
+ return result;
+ }
+
+ private void prepare(
+ int replicationFactor,
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ List<TRegionReplicaSet> allocatedRegionGroups) {
+
+ this.subGraphCount = replicationFactor / 2 + (replicationFactor % 2 == 0 ?
0 : 1);
+ this.replicationFactor = replicationFactor;
+
+ this.fakeToRealIdMap = new TreeMap<>();
+ this.realToFakeIdMap = new TreeMap<>();
+ this.dataNodeNum = availableDataNodeMap.size();
+ List<Integer> dataNodeIdList =
+ availableDataNodeMap.values().stream()
+ .map(c -> c.getLocation().getDataNodeId())
+ .collect(Collectors.toList());
+ for (int i = 0; i < dataNodeNum; i++) {
+ fakeToRealIdMap.put(i, dataNodeIdList.get(i));
+ realToFakeIdMap.put(dataNodeIdList.get(i), i);
+ }
+
+ // Compute regionCounter, databaseRegionCounter and combinationCounter
+ this.regionCounter = new int[dataNodeNum];
+ Arrays.fill(regionCounter, 0);
+ this.combinationCounter = new int[dataNodeNum][dataNodeNum];
+ for (int i = 0; i < dataNodeNum; i++) {
+ Arrays.fill(combinationCounter[i], 0);
+ }
+ for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) {
+ List<TDataNodeLocation> dataNodeLocations =
regionReplicaSet.getDataNodeLocations();
+ for (int i = 0; i < dataNodeLocations.size(); i++) {
+ int fakeIId =
realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId());
+ regionCounter[fakeIId]++;
+ for (int j = i + 1; j < dataNodeLocations.size(); j++) {
+ int fakeJId =
realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId());
+ combinationCounter[fakeIId][fakeJId] = 1;
+ combinationCounter[fakeJId][fakeIId] = 1;
+ }
+ }
+ }
+
+ // Reset the optimal result
+ this.subDataNodeNum = replicationFactor / 2 + 1;
+ this.optimalCombinationSum = Integer.MAX_VALUE;
+ this.optimalRegionSum = Integer.MAX_VALUE;
+ this.optimalSubDataNodes = new int[subDataNodeNum];
+ }
+
+ private void subGraphSearch(
+ int firstIndex,
+ int currentReplica,
+ int replicaNum,
+ int combinationSum,
+ int regionSum,
+ int[] currentReplicaSet) {
+
+ if (currentReplica == replicaNum) {
+ if (combinationSum < optimalCombinationSum
+ || (combinationSum == optimalCombinationSum && regionSum <
optimalRegionSum)) {
+ // Reset the optimal result when a better one is found
+ optimalCombinationSum = combinationSum;
+ optimalRegionSum = regionSum;
+ optimalSubDataNodes = Arrays.copyOf(currentReplicaSet,
replicationFactor);
+ } else if (combinationSum == optimalCombinationSum
+ && regionSum == optimalRegionSum
+ && RANDOM.nextBoolean()) {
+ optimalSubDataNodes = Arrays.copyOf(currentReplicaSet,
replicationFactor);
+ }
+ return;
+ }
+
+ for (int i = firstIndex; i < dataNodeNum; i += subGraphCount) {
+ if (regionCounter[i] >= regionPerDataNode) {
+ // Pruning: skip full DataNodes
+ continue;
+ }
+ int nxtCombinationSum = combinationSum;
+ for (int j = 0; j < currentReplica; j++) {
+ nxtCombinationSum += combinationCounter[i][currentReplicaSet[j]];
+ }
+ if (combinationSum > optimalCombinationSum) {
+ // Pruning: no needs for further searching when the first key
+ // is bigger than the historical optimal result
+ return;
+ }
+ int nxtRegionSum = regionSum + regionCounter[i];
+ if (combinationSum == optimalCombinationSum && regionSum >
optimalRegionSum) {
+ // Pruning: no needs for further searching when the second key
+ // is bigger than the historical optimal result
+ return;
+ }
+ currentReplicaSet[currentReplica] = i;
+ subGraphSearch(
+ i + subGraphCount,
+ currentReplica + 1,
+ replicaNum,
+ nxtCombinationSum,
+ nxtRegionSum,
+ currentReplicaSet);
+ }
+ }
+
+ private List<Integer> partiteGraphSearch(int selected) {
+ List<Integer> partiteNodes = new ArrayList<>();
+ for (int partiteIndex = 0; partiteIndex < subGraphCount; partiteIndex++) {
+ if (partiteIndex == selected) {
+ continue;
+ }
+ int selectedDataNode = -1;
+ int bestScatterWidth = 0;
+ int bestRegionSum = Integer.MAX_VALUE;
+ for (int i = partiteIndex; i < dataNodeNum; i += subGraphCount) {
+ if (regionCounter[i] >= regionPerDataNode) {
+ continue;
+ }
+ int scatterWidth = subDataNodeNum;
+ for (int k = 0; k < subDataNodeNum; k++) {
+ scatterWidth -= combinationCounter[i][optimalSubDataNodes[k]];
+ }
+ if (scatterWidth < bestScatterWidth) {
+ continue;
+ }
+ if (scatterWidth > bestScatterWidth) {
+ bestScatterWidth = scatterWidth;
+ bestRegionSum = regionCounter[i];
+ selectedDataNode = i;
+ } else if (regionCounter[i] < bestRegionSum) {
+ bestRegionSum = regionCounter[i];
+ selectedDataNode = i;
+ } else if (regionCounter[i] == bestRegionSum && RANDOM.nextBoolean()) {
+ selectedDataNode = i;
+ }
+ }
+ if (selectedDataNode == -1) {
+ return new ArrayList<>();
+ }
+ partiteNodes.add(selectedDataNode);
+ }
+ return partiteNodes;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
index 7b5914de5fc..2afcb92d153 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -71,6 +71,39 @@ public abstract class AbstractLeaderBalancer {
Map<Integer, NodeStatistics> dataNodeStatisticsMap,
Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
+ // LOGGER.info(
+ // "[LeaderDebug] databaseRegionGroupMap: {}",
+ // Arrays.stream(
+ // databaseRegionGroupMap.values().stream()
+ // .flatMap(List::stream)
+ // .collect(Collectors.toSet())
+ // .stream()
+ // .mapToInt(TConsensusGroupId::getId)
+ // .toArray())
+ // .sorted()
+ // .toArray());
+ // LOGGER.info(
+ // "[LeaderDebug] regionLocationMap: {}",
+ // Arrays.stream(
+ //
+ //
regionLocationMap.keySet().stream().mapToInt(TConsensusGroupId::getId).toArray())
+ // .sorted()
+ // .toArray());
+ // LOGGER.info(
+ // "[LeaderDebug] regionLeaderMap: {}",
+ // Arrays.stream(
+ //
+ //
regionLeaderMap.keySet().stream().mapToInt(TConsensusGroupId::getId).toArray())
+ // .sorted()
+ // .toArray());
+ // LOGGER.info(
+ // "[LeaderDebug] regionStatisticsMap: {}",
+ // Arrays.stream(
+ //
+ //
regionStatisticsMap.keySet().stream().mapToInt(TConsensusGroupId::getId).toArray())
+ // .sorted()
+ // .toArray());
+
this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
this.regionLocationMap.putAll(regionLocationMap);
this.regionLeaderMap.putAll(regionLeaderMap);
@@ -86,6 +119,18 @@ public abstract class AbstractLeaderBalancer {
regionGroupUnionSet.addAll(regionLocationMap.keySet());
regionGroupUnionSet.addAll(regionLeaderMap.keySet());
regionGroupUnionSet.addAll(regionStatisticsMap.keySet());
+ // LOGGER.info(
+ // "[LeaderDebug] regionGroupIntersection: {}",
+ //
+ //
Arrays.stream(regionGroupIntersection.stream().mapToInt(TConsensusGroupId::getId).toArray())
+ // .sorted()
+ // .toArray());
+ // LOGGER.info(
+ // "[LeaderDebug] regionGroupUnionSet: {}",
+ //
+ //
Arrays.stream(regionGroupUnionSet.stream().mapToInt(TConsensusGroupId::getId).toArray())
+ // .sorted()
+ // .toArray());
Set<TConsensusGroupId> differenceSet =
regionGroupUnionSet.stream()
.filter(e -> !regionGroupIntersection.contains(e))
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index d166d4bceae..4232c38ff10 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -125,7 +125,7 @@ public class RegionGroupCache {
// all Regions are in the Running status
return RegionGroupStatus.Running;
} else if (readonlyCount == 0) {
- return unknownCount <= ((regionCacheMap.size() - 1) / 2)
+ return unknownCount <= (regionCacheMap.size() / 2)
// The RegionGroup is considered as Available when the number of
Unknown Regions is less
// than half
? RegionGroupStatus.Available
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java
index c2f75665d4a..f42516dbf75 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java
@@ -31,6 +31,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
@@ -46,11 +47,11 @@ public class RegionGroupAllocatorSimulation {
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
private static final int TEST_LOOP = 1;
// private static final double EXAM_LOOP = 100000;
- private static final int MIN_DATA_NODE_NUM = 1;
- private static final int MAX_DATA_NODE_NUM = 100;
- private static final int MIN_DATA_REGION_PER_DATA_NODE = 1;
- private static final int MAX_DATA_REGION_PER_DATA_NODE = 10;
- private static final int DATA_REPLICATION_FACTOR = 2;
+ private static final int MIN_DATA_NODE_NUM = 7;
+ private static final int MAX_DATA_NODE_NUM = 7;
+ private static final int MIN_DATA_REGION_PER_DATA_NODE = 3;
+ private static final int MAX_DATA_REGION_PER_DATA_NODE = 3;
+ private static final int DATA_REPLICATION_FACTOR = 3;
private static final Map<Integer, TDataNodeConfiguration>
AVAILABLE_DATA_NODE_MAP =
new TreeMap<>();
@@ -59,7 +60,7 @@ public class RegionGroupAllocatorSimulation {
public static class DataEntry {
public final Integer N;
public final Integer W;
- public final Integer minScatterWidth;
+ public final Double minScatterRatio;
// public final List<Double> disabledPercent;
@@ -69,10 +70,10 @@ public class RegionGroupAllocatorSimulation {
// this.minScatterWidth = minScatterWidth;
// this.disabledPercent = disabledPercent;
// }
- private DataEntry(int N, int W, int minScatterWidth) {
+ private DataEntry(int N, int W, double minScatterRatio) {
this.N = N;
this.W = W;
- this.minScatterWidth = minScatterWidth;
+ this.minScatterRatio = minScatterRatio;
}
}
@@ -81,7 +82,7 @@ public class RegionGroupAllocatorSimulation {
List<DataEntry> testResult = new ArrayList<>();
for (int dataNodeNum = MIN_DATA_NODE_NUM; dataNodeNum <=
MAX_DATA_NODE_NUM; dataNodeNum++) {
for (int dataRegionPerDataNode = MIN_DATA_REGION_PER_DATA_NODE;
- dataRegionPerDataNode <= Math.min(MAX_DATA_REGION_PER_DATA_NODE,
dataNodeNum);
+ dataRegionPerDataNode <= MAX_DATA_REGION_PER_DATA_NODE;
dataRegionPerDataNode++) {
CONF.setDataRegionPerDataNode(dataRegionPerDataNode);
testResult.add(singleTest(dataNodeNum, dataRegionPerDataNode));
@@ -89,21 +90,21 @@ public class RegionGroupAllocatorSimulation {
// LOGGER.info("{}, finish", dataNodeNum);
}
- // FileWriter scatterW =
- // new FileWriter(
- // "/Users/yongzaodan/Desktop/simulation/psr-simulate/scatter/r="
- // + DATA_REPLICATION_FACTOR
- // + ".log");
- // for (DataEntry entry : testResult) {
- // scatterW.write(entry.minScatterWidth + "\n");
- // scatterW.flush();
- // }
- // scatterW.close();
+// FileWriter scatterW =
+// new FileWriter(
+// "/Users/yongzaodan/Desktop/simulation/psr-simulate/scatter/r="
+// + DATA_REPLICATION_FACTOR
+// + ".log");
+// for (DataEntry entry : testResult) {
+// scatterW.write(entry.minScatterRatio + "\n");
+// scatterW.flush();
+// }
+// scatterW.close();
}
private DataEntry singleTest(int N, int W) {
if (N < DATA_REPLICATION_FACTOR) {
- return new DataEntry(N, W, 0);
+ return new DataEntry(N, W, 1.0);
}
// Construct N DataNodes
Random random = new Random();
@@ -119,9 +120,10 @@ public class RegionGroupAllocatorSimulation {
final int dataRegionGroupNum = W * N / DATA_REPLICATION_FACTOR;
List<Integer> regionCountList = new ArrayList<>();
List<Integer> scatterWidthList = new ArrayList<>();
+ double minScatterRatio = 1.0;
for (int loop = 1; loop <= TEST_LOOP; loop++) {
List<TRegionReplicaSet> allocateResult = new ArrayList<>();
- IRegionGroupAllocator ALLOCATOR = new TieredReplicationAllocator();
+ IRegionGroupAllocator ALLOCATOR = new PGRA();
for (int index = 0; index < dataRegionGroupNum; index++) {
allocateResult.add(
ALLOCATOR.generateOptimalRegionReplicasDistribution(
@@ -166,6 +168,10 @@ public class RegionGroupAllocatorSimulation {
for (int i = 1; i <= N; i++) {
int scatterWidth =
scatterWidthMap.containsKey(i) ?
scatterWidthMap.get(i).cardinality() : 0;
+ if (regionCounter.getOrDefault(i, 0) > 0) {
+ int expMaxScatter = Math.min(regionCounter.get(i) *
(DATA_REPLICATION_FACTOR - 1), N - 1);
+ minScatterRatio = Math.min(minScatterRatio, (double) scatterWidth /
expMaxScatter);
+ }
int expScatter = Math.min(Math.max(regionCounter.getOrDefault(i, 0) -
1, 0) * u, N - 1);
if (scatterWidth < expScatter) {
passScatter = false;
@@ -177,11 +183,12 @@ public class RegionGroupAllocatorSimulation {
scatterWidthList.add(scatterWidth);
}
- // for (TRegionReplicaSet regionReplicaSet : allocateResult) {
- // LOGGER.info("{}",
- //
- //
regionReplicaSet.getDataNodeLocations().stream().mapToInt(TDataNodeLocation::getDataNodeId).toArray());
- // }
+ for (TRegionReplicaSet regionReplicaSet :
allocateResult) {
+ LOGGER.info("{}",
+
+
+
regionReplicaSet.getDataNodeLocations().stream().mapToInt(TDataNodeLocation::getDataNodeId).toArray());
+ }
}
int regionRange =
@@ -204,6 +211,6 @@ public class RegionGroupAllocatorSimulation {
//
regionCountList.stream().mapToInt(Integer::intValue).min().orElse(0),
//
regionCountList.stream().mapToInt(Integer::intValue).max().orElse(0),
// minScatter);
- return new DataEntry(N, W, minScatter);
+ return new DataEntry(N, W, minScatterRatio);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 166fca9309a..9c2fbe10a25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze.cache.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -483,6 +484,14 @@ public class PartitionCache {
if (result) {
groupIdToReplicaSetMap.clear();
groupIdToReplicaSetMap.putAll(map);
+ for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> routeEntry :
map.entrySet()) {
+ logger.info(
+ "[RouteMap] {}: {}",
+ routeEntry.getKey(),
+ routeEntry.getValue().getDataNodeLocations().stream()
+ .mapToInt(TDataNodeLocation::getDataNodeId)
+ .toArray());
+ }
}
return result;
} finally {