This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4862e7a94d4 Shuffle data partition allocation strategy (#16260)
4862e7a94d4 is described below

commit 4862e7a94d4c9ed0ee1a215c1feabeceb37e39a6
Author: Yongzao <[email protected]>
AuthorDate: Tue Aug 26 22:20:23 2025 +0800

    Shuffle data partition allocation strategy (#16260)
---
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   6 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 ...T.java => IoTDBPartitionInheritStrategyIT.java} |   4 +-
 .../partition/IoTDBPartitionShuffleStrategyIT.java | 140 +++++++++++++++
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  10 ++
 .../confignode/conf/ConfigNodeDescriptor.java      |   4 +
 .../manager/load/balancer/PartitionBalancer.java   | 194 +++++++++++++++------
 9 files changed, 319 insertions(+), 52 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 6ac73025a4e..1302e64e248 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -393,6 +393,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setDataPartitionAllocationStrategy(String 
dataPartitionAllocationStrategy) {
+    setProperty("data_partition_allocation_strategy", 
dataPartitionAllocationStrategy);
+    return this;
+  }
+
   @Override
   public CommonConfig setSeriesPartitionExecutorClass(String 
seriesPartitionExecutorClass) {
     setProperty("series_partition_executor_class", 
seriesPartitionExecutorClass);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 1ad18e854cc..cf09cbfbf8f 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -396,6 +396,12 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setDataPartitionAllocationStrategy(String 
dataPartitionAllocationStrategy) {
+    
cnConfig.setDataPartitionAllocationStrategy(dataPartitionAllocationStrategy);
+    return this;
+  }
+
   @Override
   public CommonConfig setSeriesPartitionExecutorClass(String 
seriesPartitionExecutorClass) {
     cnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 47c9c03dc74..676b914ab1b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -279,6 +279,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setDataPartitionAllocationStrategy(String 
dataPartitionAllocationStrategy) {
+    return this;
+  }
+
   @Override
   public CommonConfig setSeriesPartitionExecutorClass(String 
seriesPartitionExecutorClass) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index c6e8f997739..f6f5aae17a0 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -126,6 +126,8 @@ public interface CommonConfig {
 
   CommonConfig setSeriesSlotNum(int seriesSlotNum);
 
+  CommonConfig setDataPartitionAllocationStrategy(String 
dataPartitionAllocationStrategy);
+
   CommonConfig setSeriesPartitionExecutorClass(String 
seriesPartitionExecutorClass);
 
   CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
similarity index 98%
rename from 
integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
index 2efd5db7a0b..91d34b2d0c9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
@@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
-public class IoTDBPartitionInheritPolicyIT {
+public class IoTDBPartitionInheritStrategyIT {
 
   private static final String testDataRegionConsensusProtocolClass =
       ConsensusFactory.RATIS_CONSENSUS;
@@ -91,7 +91,7 @@ public class IoTDBPartitionInheritPolicyIT {
   }
 
   @Test
-  public void testDataPartitionInheritPolicy() throws Exception {
+  public void testDataPartitionInheritStrategy() throws Exception {
     final long baseStartTime = 1000;
     Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new 
ConcurrentHashMap<>();
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
new file mode 100644
index 00000000000..70f170caa11
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBPartitionShuffleStrategyIT {
+
+  private static final String testDataRegionConsensusProtocolClass =
+      ConsensusFactory.RATIS_CONSENSUS;
+  private static final int testReplicationFactor = 1;
+  private static final String testDataPartitionAllocationStrategy = "SHUFFLE";
+  private static final int testSeriesSlotNum = 1000;
+  private static final long testTimePartitionInterval = 604800000;
+  private static final double testDataRegionPerDataNode = 5.0;
+
+  private static final String database = "root.database";
+  private static final int testTimePartitionSlotsNum = 100;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        
.setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass)
+        .setDataReplicationFactor(testReplicationFactor)
+        .setTimePartitionInterval(testTimePartitionInterval)
+        .setSeriesSlotNum(testSeriesSlotNum)
+        
.setDataPartitionAllocationStrategy(testDataPartitionAllocationStrategy)
+        .setDataRegionPerDataNode(testDataRegionPerDataNode);
+
+    // Init 1C1D environment
+    EnvFactory.getEnv().initClusterEnvironment(1, 1);
+
+    // Set Database
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      TSStatus status = client.setDatabase(new TDatabaseSchema(database));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testDataPartitionShuffleStrategy() throws Exception {
+    List<Integer> randomTimeSlotList = new ArrayList<>();
+    for (int i = 0; i < testTimePartitionSlotsNum; i++) {
+      randomTimeSlotList.add(i);
+    }
+    Collections.shuffle(randomTimeSlotList);
+    for (int timeSlotId : randomTimeSlotList) {
+      // To test the shuffle strategy, we merely need to use a random time 
slot order
+      ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+          database, 0, testSeriesSlotNum, timeSlotId, timeSlotId + 1, 
testTimePartitionInterval);
+    }
+    TDataPartitionTableResp dataPartitionTableResp;
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      dataPartitionTableResp =
+          client.getDataPartitionTable(
+              new TDataPartitionReq(
+                  ConfigNodeTestUtils.constructPartitionSlotsMap(
+                      database,
+                      0,
+                      testSeriesSlotNum,
+                      0,
+                      testTimePartitionSlotsNum,
+                      testTimePartitionInterval)));
+    }
+    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>>
+        partitionTable = dataPartitionTableResp.getDataPartitionTable();
+    for (long currentStartTime = testTimePartitionInterval;
+        currentStartTime < testTimePartitionInterval * 
testTimePartitionSlotsNum;
+        currentStartTime += testTimePartitionInterval) {
+      TTimePartitionSlot precedingTimeSlot =
+          new TTimePartitionSlot(currentStartTime - testTimePartitionInterval);
+      TTimePartitionSlot currentTimeSlot = new 
TTimePartitionSlot(currentStartTime);
+      for (int seriesSlotId = 0; seriesSlotId < testSeriesSlotNum; 
seriesSlotId++) {
+        TSeriesPartitionSlot seriesPartitionSlot = new 
TSeriesPartitionSlot(seriesSlotId);
+        List<TConsensusGroupId> precedingRegionGroupIds =
+            
partitionTable.get(database).get(seriesPartitionSlot).get(precedingTimeSlot);
+        List<TConsensusGroupId> currentRegionGroupIds =
+            
partitionTable.get(database).get(seriesPartitionSlot).get(currentTimeSlot);
+        Assert.assertEquals(precedingRegionGroupIds.size(), 
currentRegionGroupIds.size());
+        for (int i = 0; i < precedingRegionGroupIds.size(); i++) {
+          // Ensure that the RegionGroupId is different in two adjacent 
TimePartitionSlots
+          Assert.assertNotEquals(precedingRegionGroupIds.get(i), 
currentRegionGroupIds.get(i));
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 868949c0b22..57e619a9baa 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -82,6 +82,8 @@ public class ConfigNodeConfig {
   private String seriesPartitionExecutorClass =
       "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
 
+  private String dataPartitionAllocationStrategy = "INHERIT";
+
   /** The policy of extension SchemaRegionGroup for each Database. */
   private RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy =
       RegionGroupExtensionPolicy.AUTO;
@@ -423,6 +425,14 @@ public class ConfigNodeConfig {
     this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
   }
 
+  public String getDataPartitionAllocationStrategy() {
+    return dataPartitionAllocationStrategy;
+  }
+
+  public void setDataPartitionAllocationStrategy(String 
dataPartitionAllocationStrategy) {
+    this.dataPartitionAllocationStrategy = dataPartitionAllocationStrategy;
+  }
+
   public int getCnRpcMaxConcurrentClientNum() {
     return rpcMaxConcurrentClientNum;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 0c9ccdeb928..f26ec199d8f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -182,6 +182,10 @@ public class ConfigNodeDescriptor {
         properties.getProperty(
             "series_partition_executor_class", 
conf.getSeriesPartitionExecutorClass()));
 
+    conf.setDataPartitionAllocationStrategy(
+        properties.getProperty(
+            "data_partition_allocation_strategy", 
conf.getDataPartitionAllocationStrategy()));
+
     conf.setConfigNodeConsensusProtocolClass(
         properties.getProperty(
             "config_node_consensus_protocol_class", 
conf.getConfigNodeConsensusProtocolClass()));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 3db9dcf6a1d..7850cbadc49 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
 import org.apache.iotdb.confignode.manager.IManager;
@@ -39,10 +40,12 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -56,12 +59,36 @@ public class PartitionBalancer {
 
   private final IManager configManager;
 
-  // Map<DatabaseName, DataPartitionPolicyTable>
+  private final DataPartitionAllocationStrategy 
dataPartitionAllocationStrategy;
+  // Map<DatabaseName, DataPartitionPolicyTable>, employed by INHERIT 
allocation strategy
   private final Map<String, DataPartitionPolicyTable> 
dataPartitionPolicyTableMap;
 
+  private enum DataPartitionAllocationStrategy {
+    // The INHERIT strategy tries to allocate adjacent DataPartitions as
+    // consistent as possible, while ensuring load balancing.
+    INHERIT,
+    // The SHUFFLE strategy tries to allocate adjacent DataPartitions as
+    // inconsistent as possible, note the result could be unbalanced.
+    SHUFFLE
+  }
+
   public PartitionBalancer(IManager configManager) {
     this.configManager = configManager;
     this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>();
+    switch 
(ConfigNodeDescriptor.getInstance().getConf().getDataPartitionAllocationStrategy())
 {
+      case "INHERIT":
+        this.dataPartitionAllocationStrategy = 
DataPartitionAllocationStrategy.INHERIT;
+        break;
+      case "SHUFFLE":
+        this.dataPartitionAllocationStrategy = 
DataPartitionAllocationStrategy.SHUFFLE;
+        break;
+      default:
+        LOGGER.warn(
+            "Unknown DataPartition allocation strategy {}, using INHERIT 
strategy by default.",
+            
ConfigNodeDescriptor.getInstance().getConf().getDataPartitionAllocationStrategy());
+        this.dataPartitionAllocationStrategy = 
DataPartitionAllocationStrategy.INHERIT;
+        break;
+    }
   }
 
   /**
@@ -152,56 +179,25 @@ public class PartitionBalancer {
           List<TTimePartitionSlot> timePartitionSlots =
               seriesPartitionEntry.getValue().getTimePartitionSlots();
           
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
-
-          for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
-
-            // 1. The historical DataPartition will try to inherit successor 
DataPartition first
-            TConsensusGroupId successor =
-                getPartitionManager()
-                    .getSuccessorDataPartition(database, seriesPartitionSlot, 
timePartitionSlot);
-            if (successor != null && 
availableDataRegionGroupCounter.containsKey(successor)) {
-              seriesPartitionTable.putDataPartition(timePartitionSlot, 
successor);
-              availableDataRegionGroupCounter.put(
-                  successor, availableDataRegionGroupCounter.get(successor) + 
1);
-              continue;
-            }
-
-            // 2. Assign DataPartition base on the DataAllotTable
-            TConsensusGroupId allotGroupId =
-                
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
-            if (availableDataRegionGroupCounter.containsKey(allotGroupId)) {
-              seriesPartitionTable.putDataPartition(timePartitionSlot, 
allotGroupId);
-              availableDataRegionGroupCounter.put(
-                  allotGroupId, 
availableDataRegionGroupCounter.get(allotGroupId) + 1);
-              continue;
-            }
-
-            // 3. The allotDataRegionGroup is unavailable,
-            // try to inherit predecessor DataPartition
-            TConsensusGroupId predecessor =
-                getPartitionManager()
-                    .getPredecessorDataPartition(database, 
seriesPartitionSlot, timePartitionSlot);
-            if (predecessor != null && 
availableDataRegionGroupCounter.containsKey(predecessor)) {
-              seriesPartitionTable.putDataPartition(timePartitionSlot, 
predecessor);
-              availableDataRegionGroupCounter.put(
-                  predecessor, 
availableDataRegionGroupCounter.get(predecessor) + 1);
-              continue;
-            }
-
-            // 4. Assign the DataPartition to DataRegionGroup with the least 
DataPartitions
-            // If the above DataRegionGroups are unavailable
-            TConsensusGroupId greedyGroupId = 
availableDataRegionGroupCounter.getKeyWithMinValue();
-            seriesPartitionTable.putDataPartition(timePartitionSlot, 
greedyGroupId);
-            availableDataRegionGroupCounter.put(
-                greedyGroupId, 
availableDataRegionGroupCounter.get(greedyGroupId) + 1);
-            LOGGER.warn(
-                "[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will 
be allocated to DataRegionGroup: {}, because the original target: {} is 
currently unavailable.",
-                seriesPartitionSlot,
-                timePartitionSlot,
-                greedyGroupId,
-                allotGroupId);
+          switch (dataPartitionAllocationStrategy) {
+            case INHERIT:
+              inheritAllocationStrategy(
+                  database,
+                  allotTable,
+                  seriesPartitionSlot,
+                  timePartitionSlots,
+                  availableDataRegionGroupCounter,
+                  seriesPartitionTable);
+              break;
+            case SHUFFLE:
+              shuffleAllocationStrategy(
+                  database,
+                  seriesPartitionSlot,
+                  timePartitionSlots,
+                  availableDataRegionGroupCounter,
+                  seriesPartitionTable);
+              break;
           }
-
           dataPartitionTable
               .getDataPartitionMap()
               .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
@@ -215,6 +211,104 @@ public class PartitionBalancer {
     return result;
   }
 
+  private void inheritAllocationStrategy(
+      String database,
+      DataPartitionPolicyTable allotTable,
+      TSeriesPartitionSlot seriesPartitionSlot,
+      List<TTimePartitionSlot> timePartitionSlots,
+      BalanceTreeMap<TConsensusGroupId, Integer> 
availableDataRegionGroupCounter,
+      SeriesPartitionTable seriesPartitionTable) {
+    for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+
+      // 1. The historical DataPartition will try to inherit successor 
DataPartition first
+      TConsensusGroupId successor =
+          getPartitionManager()
+              .getSuccessorDataPartition(database, seriesPartitionSlot, 
timePartitionSlot);
+      if (successor != null && 
availableDataRegionGroupCounter.containsKey(successor)) {
+        seriesPartitionTable.putDataPartition(timePartitionSlot, successor);
+        availableDataRegionGroupCounter.put(
+            successor, availableDataRegionGroupCounter.get(successor) + 1);
+        continue;
+      }
+
+      // 2. Assign DataPartition base on the DataAllotTable
+      TConsensusGroupId allotGroupId =
+          
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+      if (availableDataRegionGroupCounter.containsKey(allotGroupId)) {
+        seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId);
+        availableDataRegionGroupCounter.put(
+            allotGroupId, availableDataRegionGroupCounter.get(allotGroupId) + 
1);
+        continue;
+      }
+
+      // 3. The allotDataRegionGroup is unavailable,
+      // try to inherit predecessor DataPartition
+      TConsensusGroupId predecessor =
+          getPartitionManager()
+              .getPredecessorDataPartition(database, seriesPartitionSlot, 
timePartitionSlot);
+      if (predecessor != null && 
availableDataRegionGroupCounter.containsKey(predecessor)) {
+        seriesPartitionTable.putDataPartition(timePartitionSlot, predecessor);
+        availableDataRegionGroupCounter.put(
+            predecessor, availableDataRegionGroupCounter.get(predecessor) + 1);
+        continue;
+      }
+
+      // 4. Assign the DataPartition to DataRegionGroup with the least 
DataPartitions
+      // If the above DataRegionGroups are unavailable
+      TConsensusGroupId greedyGroupId = 
availableDataRegionGroupCounter.getKeyWithMinValue();
+      seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId);
+      availableDataRegionGroupCounter.put(
+          greedyGroupId, availableDataRegionGroupCounter.get(greedyGroupId) + 
1);
+      LOGGER.warn(
+          "[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will be 
allocated to DataRegionGroup: {}, because the original target: {} is currently 
unavailable.",
+          seriesPartitionSlot,
+          timePartitionSlot,
+          greedyGroupId,
+          allotGroupId);
+    }
+  }
+
+  private void shuffleAllocationStrategy(
+      String database,
+      TSeriesPartitionSlot seriesPartitionSlot,
+      List<TTimePartitionSlot> timePartitionSlots,
+      BalanceTreeMap<TConsensusGroupId, Integer> 
availableDataRegionGroupCounter,
+      SeriesPartitionTable seriesPartitionTable) {
+    final Random random = new Random();
+    List<TConsensusGroupId> availableDataRegionGroups =
+        new ArrayList<>(availableDataRegionGroupCounter.keySet());
+    for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+      if (availableDataRegionGroups.size() == 1) {
+        // Only one available DataRegionGroup
+        seriesPartitionTable.putDataPartition(
+            timePartitionSlot, availableDataRegionGroups.iterator().next());
+        continue;
+      }
+      TConsensusGroupId predecessor =
+          getPartitionManager()
+              .getPredecessorDataPartition(database, seriesPartitionSlot, 
timePartitionSlot);
+      TConsensusGroupId successor =
+          getPartitionManager()
+              .getSuccessorDataPartition(database, seriesPartitionSlot, 
timePartitionSlot);
+      if (predecessor != null
+          && successor != null
+          && !predecessor.equals(successor)
+          && availableDataRegionGroups.size() == 2) {
+        // Only two available DataRegionGroups and predecessor equals successor
+        seriesPartitionTable.putDataPartition(
+            timePartitionSlot, random.nextBoolean() ? successor : predecessor);
+        continue;
+      }
+      TConsensusGroupId targetGroupId;
+      do {
+        // Randomly pick a DataRegionGroup from availableDataRegionGroups
+        targetGroupId =
+            
availableDataRegionGroups.get(random.nextInt(availableDataRegionGroups.size()));
+      } while (targetGroupId.equals(predecessor) || 
targetGroupId.equals(successor));
+      seriesPartitionTable.putDataPartition(timePartitionSlot, targetGroupId);
+    }
+  }
+
   /**
    * Re-balance the DataPartitionPolicyTable.
    *

Reply via email to