This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch greedy-copy-set
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2e6e97056d3445ba7a86dfb90389f8e5ebf443ca
Author: YongzaoDan <[email protected]>
AuthorDate: Sun Nov 19 12:07:55 2023 +0800

    Finish
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   2 +-
 .../manager/load/balancer/RegionBalancer.java      |   9 +-
 .../region/GreedyCopySetRegionGroupAllocator.java  | 183 ++++++++++++++++++
 .../GreedyCopySetRegionGroupAllocatorTest.java     | 209 +++++++++++++++++++++
 4 files changed, 400 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 277a3513f3c..c755eb010cc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -108,7 +108,7 @@ public class ConfigNodeConfig {
 
   /** RegionGroup allocate policy. */
   private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy =
-      RegionBalancer.RegionGroupAllocatePolicy.GREEDY;
+      RegionBalancer.RegionGroupAllocatePolicy.GREEDY_COPY_SET;
 
   /** Max concurrent client number. */
   private int rpcMaxConcurrentClientNum = 65535;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index dc2c70fc003..33ecb70c3bb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import 
org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
+import 
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator;
 import 
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
 import 
org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
@@ -56,8 +57,11 @@ public class RegionBalancer {
         this.regionGroupAllocator = new CopySetRegionGroupAllocator();
         break;
       case GREEDY:
-      default:
         this.regionGroupAllocator = new GreedyRegionGroupAllocator();
+        break;
+      case GREEDY_COPY_SET:
+      default:
+        this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
     }
   }
 
@@ -147,6 +151,7 @@ public class RegionBalancer {
 
   public enum RegionGroupAllocatePolicy {
     COPY_SET,
-    GREEDY
+    GREEDY,
+    GREEDY_COPY_SET
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
new file mode 100644
index 00000000000..77e4e2965e0
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Allocate Region through Greedy and CopySet Algorithm */
+public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator {
+
+  int replicationFactor;
+  // RegionGroup allocation BitSet
+  private List<BitSet> allocatedBitSets;
+  // Map<DataNodeId, RegionGroup count>
+  private Map<Integer, AtomicInteger> regionCounter;
+  // Available DataNodeIds
+  private Integer[] dataNodeIds;
+
+  // First Key: the sum of Regions at the DataNodes in the allocation result 
is minimal
+  int optimalRegionSum;
+  // Second Key: the sum of intersected Regions with other allocated 
RegionGroups is minimal
+  int optimalIntersectionSum;
+  List<Integer[]> optimalReplicaSets;
+
+  public GreedyCopySetRegionGroupAllocator() {
+    // Empty constructor
+  }
+
+  @Override
+  public TRegionReplicaSet generateOptimalRegionReplicasDistribution(
+      Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+      Map<Integer, Double> freeDiskSpaceMap,
+      List<TRegionReplicaSet> allocatedRegionGroups,
+      int replicationFactor,
+      TConsensusGroupId consensusGroupId) {
+
+    prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups);
+    dfs(-1, 0, new Integer[replicationFactor], 0, 0);
+
+    // Randomly pick one optimal plan as result
+    Collections.shuffle(optimalReplicaSets);
+    Integer[] optimalReplicaSet = optimalReplicaSets.get(0);
+    TRegionReplicaSet result = new TRegionReplicaSet();
+    result.setRegionId(consensusGroupId);
+    for (int i = 0; i < replicationFactor; i++) {
+      
result.addToDataNodeLocations(availableDataNodeMap.get(optimalReplicaSet[i]).getLocation());
+    }
+    return result;
+  }
+
+  /**
+   * Prepare some statistics before dfs
+   *
+   * @param replicationFactor replication factor in the cluster
+   * @param availableDataNodeMap currently available DataNodes, ensure size() 
>= replicationFactor
+   * @param allocatedRegionGroups already allocated RegionGroups in the cluster
+   */
+  private void prepare(
+      int replicationFactor,
+      Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+      List<TRegionReplicaSet> allocatedRegionGroups) {
+
+    this.replicationFactor = replicationFactor;
+    int maxDataNodeId = 
availableDataNodeMap.keySet().stream().max(Integer::compareTo).orElse(0);
+    for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) {
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        // Store the maximum DataNodeId in this algorithm loop
+        maxDataNodeId = Math.max(maxDataNodeId, 
dataNodeLocation.getDataNodeId());
+      }
+    }
+
+    // Convert the allocatedRegionGroups into allocatedBitSets,
+    // where a true in BitSet corresponding to a DataNodeId in the RegionGroup
+    allocatedBitSets = new ArrayList<>();
+    for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) {
+      BitSet bitSet = new BitSet(maxDataNodeId + 1);
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        bitSet.set(dataNodeLocation.getDataNodeId());
+      }
+      allocatedBitSets.add(bitSet);
+    }
+
+    // Count the number of Regions in each DataNode
+    regionCounter = new HashMap<>();
+    for (int i = 0; i <= maxDataNodeId; i++) {
+      regionCounter.put(i, new AtomicInteger(0));
+    }
+    allocatedRegionGroups.forEach(
+        regionGroup ->
+            regionGroup
+                .getDataNodeLocations()
+                .forEach(
+                    dataNodeLocation ->
+                        
regionCounter.get(dataNodeLocation.getDataNodeId()).incrementAndGet()));
+
+    // Reset the optimal result
+    dataNodeIds = new Integer[availableDataNodeMap.size()];
+    availableDataNodeMap.keySet().toArray(dataNodeIds);
+    optimalRegionSum = Integer.MAX_VALUE;
+    optimalIntersectionSum = Integer.MAX_VALUE;
+    optimalReplicaSets = new ArrayList<>();
+  }
+
+  /**
+   * Dfs each possible allocation plan, and keep those with the highest 
priority: First Key: the sum
+   * of Regions at the DataNodes in the allocation result is minimal, Second 
Key: the sum of
+   * intersected Regions with other allocated RegionGroups is minimal
+   *
+   * @param lastIndex last decided index in dataNodeIds
+   * @param currentReplica current replica index
+   * @param currentReplicaSet current allocation plan
+   * @param regionSum the sum of Regions at the DataNodes in the current 
allocation plan
+   * @param intersectionSum the sum of intersected Regions with other 
allocated RegionGroups in
+   *     current allocation plan
+   */
+  private void dfs(
+      int lastIndex,
+      int currentReplica,
+      Integer[] currentReplicaSet,
+      int regionSum,
+      int intersectionSum) {
+    if (regionSum > optimalRegionSum || intersectionSum > optimalRegionSum) {
+      // Pruning: no needs for further searching when either the first key or 
the second key
+      // is bigger than the historical optimal result
+      return;
+    }
+
+    if (currentReplica == replicationFactor) {
+      // A complete allocation plan is found
+      if (regionSum < optimalRegionSum || intersectionSum < optimalRegionSum) {
+        // Reset the optimal result when a better one is found
+        optimalRegionSum = regionSum;
+        optimalIntersectionSum = intersectionSum;
+        optimalReplicaSets.clear();
+      }
+      optimalReplicaSets.add(Arrays.copyOf(currentReplicaSet, 
replicationFactor));
+      return;
+    }
+
+    for (int i = lastIndex + 1; i < dataNodeIds.length; i++) {
+      // Decide the next DataNodeId in the allocation plan
+      currentReplicaSet[currentReplica] = dataNodeIds[i];
+      int intersectionDelta = 0;
+      for (BitSet bitSet : allocatedBitSets) {
+        intersectionDelta += bitSet.get(dataNodeIds[i]) ? 1 : 0;
+      }
+      dfs(
+          i,
+          currentReplica + 1,
+          currentReplicaSet,
+          regionSum + regionCounter.get(dataNodeIds[i]).get(),
+          intersectionSum + intersectionDelta);
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
new file mode 100644
index 00000000000..c161eb5f241
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocatorTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class GreedyCopySetRegionGroupAllocatorTest {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(GreedyCopySetRegionGroupAllocatorTest.class);
+
+  private static final GreedyRegionGroupAllocator GREEDY_ALLOCATOR =
+      new GreedyRegionGroupAllocator();
+  private static final GreedyCopySetRegionGroupAllocator 
GREEDY_COPY_SET_ALLOCATOR =
+      new GreedyCopySetRegionGroupAllocator();
+
+  private static final int TEST_DATA_NODE_NUM = 21;
+  private static final int DATA_REGION_PER_DATA_NODE =
+      (int) 
ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode();
+  private static final Map<Integer, TDataNodeConfiguration> 
AVAILABLE_DATA_NODE_MAP =
+      new HashMap<>();
+  private static final Map<Integer, Double> FREE_SPACE_MAP = new HashMap<>();
+
+  @BeforeClass
+  public static void setUp() {
+    // Construct 21 DataNodes
+    Random random = new Random();
+    for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
+      AVAILABLE_DATA_NODE_MAP.put(
+          i, new TDataNodeConfiguration().setLocation(new 
TDataNodeLocation().setDataNodeId(i)));
+      FREE_SPACE_MAP.put(i, random.nextDouble());
+    }
+  }
+
+  @Test
+  public void test2Factor() {
+    testRegionDistributionAndScatterWidth(2);
+  }
+
+  @Test
+  public void test3Factor() {
+    testRegionDistributionAndScatterWidth(3);
+  }
+
+  private void testRegionDistributionAndScatterWidth(int replicationFactor) {
+    final int dataRegionGroupNum =
+        DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / replicationFactor;
+
+    /* Allocate DataRegionGroups */
+    List<TRegionReplicaSet> greedyResult = new ArrayList<>();
+    List<TRegionReplicaSet> greedyCopySetResult = new ArrayList<>();
+    for (int index = 0; index < dataRegionGroupNum; index++) {
+      greedyResult.add(
+          GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+              AVAILABLE_DATA_NODE_MAP,
+              FREE_SPACE_MAP,
+              greedyResult,
+              replicationFactor,
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, index)));
+      greedyCopySetResult.add(
+          GREEDY_COPY_SET_ALLOCATOR.generateOptimalRegionReplicasDistribution(
+              AVAILABLE_DATA_NODE_MAP,
+              FREE_SPACE_MAP,
+              greedyCopySetResult,
+              replicationFactor,
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, index)));
+    }
+
+    /* Statistics result */
+    // Map<DataNodeId, RegionGroup Count> for greedy algorithm
+    Map<Integer, AtomicInteger> greedyRegionCounter = new HashMap<>();
+    greedyResult.forEach(
+        regionReplicaSet ->
+            regionReplicaSet
+                .getDataNodeLocations()
+                .forEach(
+                    dataNodeLocation -> {
+                      greedyRegionCounter
+                          .computeIfAbsent(
+                              dataNodeLocation.getDataNodeId(), empty -> new 
AtomicInteger(0))
+                          .incrementAndGet();
+                    }));
+    // Map<DataNodeId, ScatterWidth> for greedy algorithm
+    // where a true in the bitset denotes the corresponding DataNode can help 
the DataNode in
+    // Map-Key to
+    // share the RegionGroup-leader and restore data when restarting. The more 
true in the bitset,
+    // the
+    // more safety the cluster DataNode in Map-Key is.
+    Map<Integer, BitSet> greedyScatterWidth = new HashMap<>();
+    for (TRegionReplicaSet replicaSet : greedyResult) {
+      for (int i = 0; i < replicationFactor; i++) {
+        for (int j = i + 1; j < replicationFactor; j++) {
+          int dataNodeId1 = 
replicaSet.getDataNodeLocations().get(i).getDataNodeId();
+          int dataNodeId2 = 
replicaSet.getDataNodeLocations().get(j).getDataNodeId();
+          greedyScatterWidth.computeIfAbsent(dataNodeId1, empty -> new 
BitSet()).set(dataNodeId2);
+          greedyScatterWidth.computeIfAbsent(dataNodeId2, empty -> new 
BitSet()).set(dataNodeId1);
+        }
+      }
+    }
+
+    // Map<DataNodeId, RegionGroup Count> for greedy-copy-set algorithm
+    Map<Integer, AtomicInteger> greedyCopySetRegionCounter = new HashMap<>();
+    greedyCopySetResult.forEach(
+        regionReplicaSet ->
+            regionReplicaSet
+                .getDataNodeLocations()
+                .forEach(
+                    dataNodeLocation ->
+                        greedyCopySetRegionCounter
+                            .computeIfAbsent(
+                                dataNodeLocation.getDataNodeId(), empty -> new 
AtomicInteger(0))
+                            .incrementAndGet()));
+    // Map<DataNodeId, ScatterWidth> for greedy-copy-set algorithm, ditto
+    Map<Integer, BitSet> greedyCopySetScatterWidth = new HashMap<>();
+    for (TRegionReplicaSet replicaSet : greedyCopySetResult) {
+      for (int i = 0; i < replicationFactor; i++) {
+        for (int j = i + 1; j < replicationFactor; j++) {
+          int dataNodeId1 = 
replicaSet.getDataNodeLocations().get(i).getDataNodeId();
+          int dataNodeId2 = 
replicaSet.getDataNodeLocations().get(j).getDataNodeId();
+          greedyCopySetScatterWidth
+              .computeIfAbsent(dataNodeId1, empty -> new BitSet())
+              .set(dataNodeId2);
+          greedyCopySetScatterWidth
+              .computeIfAbsent(dataNodeId2, empty -> new BitSet())
+              .set(dataNodeId1);
+        }
+      }
+    }
+
+    /* Check result */
+    int avgDataRegionNum = dataRegionGroupNum * replicationFactor;
+    avgDataRegionNum =
+        avgDataRegionNum % replicationFactor == 0
+            ? avgDataRegionNum / replicationFactor
+            : avgDataRegionNum / replicationFactor + 1;
+    int greedyScatterWidthSum = 0;
+    int greedyMinScatterWidth = Integer.MAX_VALUE;
+    int greedyMaxScatterWidth = Integer.MIN_VALUE;
+    int greedyCopySetScatterWidthSum = 0;
+    int greedyCopySetMinScatterWidth = Integer.MAX_VALUE;
+    int greedyCopySetMaxScatterWidth = Integer.MIN_VALUE;
+    for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
+      Assert.assertTrue(greedyRegionCounter.get(i).get() <= avgDataRegionNum);
+      Assert.assertTrue(greedyCopySetRegionCounter.get(i).get() <= 
avgDataRegionNum);
+
+      int scatterWidth = greedyScatterWidth.get(i).cardinality();
+      greedyScatterWidthSum += scatterWidth;
+      greedyMinScatterWidth = Math.min(greedyMinScatterWidth, scatterWidth);
+      greedyMaxScatterWidth = Math.max(greedyMaxScatterWidth, scatterWidth);
+
+      scatterWidth = greedyCopySetScatterWidth.get(i).cardinality();
+      greedyCopySetScatterWidthSum += scatterWidth;
+      greedyCopySetMinScatterWidth = Math.min(greedyCopySetMinScatterWidth, 
scatterWidth);
+      greedyCopySetMaxScatterWidth = Math.max(greedyCopySetMaxScatterWidth, 
scatterWidth);
+    }
+
+    LOGGER.info(
+        "replicationFactor: {}, Scatter width for greedy: avg={}, min={}, 
max={}",
+        replicationFactor,
+        (double) greedyScatterWidthSum / TEST_DATA_NODE_NUM,
+        greedyMinScatterWidth,
+        greedyMaxScatterWidth);
+    LOGGER.info(
+        "replicationFactor: {}, Scatter width for greedyCopySet: avg={}, 
min={}, max={}",
+        replicationFactor,
+        (double) greedyCopySetScatterWidthSum / TEST_DATA_NODE_NUM,
+        greedyCopySetMinScatterWidth,
+        greedyCopySetMaxScatterWidth);
+    Assert.assertTrue(greedyCopySetScatterWidthSum >= greedyScatterWidthSum);
+    Assert.assertTrue(greedyCopySetMaxScatterWidth >= greedyMaxScatterWidth);
+    Assert.assertTrue(greedyCopySetMinScatterWidth >= greedyMinScatterWidth);
+  }
+}

Reply via email to