This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4862e7a94d4 Shuffle data partition allocation strategy (#16260)
4862e7a94d4 is described below
commit 4862e7a94d4c9ed0ee1a215c1feabeceb37e39a6
Author: Yongzao <[email protected]>
AuthorDate: Tue Aug 26 22:20:23 2025 +0800
Shuffle data partition allocation strategy (#16260)
---
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../env/cluster/config/MppSharedCommonConfig.java | 6 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
...T.java => IoTDBPartitionInheritStrategyIT.java} | 4 +-
.../partition/IoTDBPartitionShuffleStrategyIT.java | 140 +++++++++++++++
.../iotdb/confignode/conf/ConfigNodeConfig.java | 10 ++
.../confignode/conf/ConfigNodeDescriptor.java | 4 +
.../manager/load/balancer/PartitionBalancer.java | 194 +++++++++++++++------
9 files changed, 319 insertions(+), 52 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 6ac73025a4e..1302e64e248 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -393,6 +393,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setDataPartitionAllocationStrategy(String
dataPartitionAllocationStrategy) {
+ setProperty("data_partition_allocation_strategy",
dataPartitionAllocationStrategy);
+ return this;
+ }
+
@Override
public CommonConfig setSeriesPartitionExecutorClass(String
seriesPartitionExecutorClass) {
setProperty("series_partition_executor_class",
seriesPartitionExecutorClass);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 1ad18e854cc..cf09cbfbf8f 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -396,6 +396,12 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setDataPartitionAllocationStrategy(String
dataPartitionAllocationStrategy) {
+
cnConfig.setDataPartitionAllocationStrategy(dataPartitionAllocationStrategy);
+ return this;
+ }
+
@Override
public CommonConfig setSeriesPartitionExecutorClass(String
seriesPartitionExecutorClass) {
cnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 47c9c03dc74..676b914ab1b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -279,6 +279,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setDataPartitionAllocationStrategy(String
dataPartitionAllocationStrategy) {
+ return this;
+ }
+
@Override
public CommonConfig setSeriesPartitionExecutorClass(String
seriesPartitionExecutorClass) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index c6e8f997739..f6f5aae17a0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -126,6 +126,8 @@ public interface CommonConfig {
CommonConfig setSeriesSlotNum(int seriesSlotNum);
+ CommonConfig setDataPartitionAllocationStrategy(String
dataPartitionAllocationStrategy);
+
CommonConfig setSeriesPartitionExecutorClass(String
seriesPartitionExecutorClass);
CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
similarity index 98%
rename from
integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
index 2efd5db7a0b..91d34b2d0c9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
@@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBPartitionInheritPolicyIT {
+public class IoTDBPartitionInheritStrategyIT {
private static final String testDataRegionConsensusProtocolClass =
ConsensusFactory.RATIS_CONSENSUS;
@@ -91,7 +91,7 @@ public class IoTDBPartitionInheritPolicyIT {
}
@Test
- public void testDataPartitionInheritPolicy() throws Exception {
+ public void testDataPartitionInheritStrategy() throws Exception {
final long baseStartTime = 1000;
Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new
ConcurrentHashMap<>();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
new file mode 100644
index 00000000000..70f170caa11
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBPartitionShuffleStrategyIT {
+
+ private static final String testDataRegionConsensusProtocolClass =
+ ConsensusFactory.RATIS_CONSENSUS;
+ private static final int testReplicationFactor = 1;
+ private static final String testDataPartitionAllocationStrategy = "SHUFFLE";
+ private static final int testSeriesSlotNum = 1000;
+ private static final long testTimePartitionInterval = 604800000;
+ private static final double testDataRegionPerDataNode = 5.0;
+
+ private static final String database = "root.database";
+ private static final int testTimePartitionSlotsNum = 100;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+
.setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass)
+ .setDataReplicationFactor(testReplicationFactor)
+ .setTimePartitionInterval(testTimePartitionInterval)
+ .setSeriesSlotNum(testSeriesSlotNum)
+
.setDataPartitionAllocationStrategy(testDataPartitionAllocationStrategy)
+ .setDataRegionPerDataNode(testDataRegionPerDataNode);
+
+ // Init 1C1D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 1);
+
+ // Set Database
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ TSStatus status = client.setDatabase(new TDatabaseSchema(database));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testDataPartitionShuffleStrategy() throws Exception {
+ List<Integer> randomTimeSlotList = new ArrayList<>();
+ for (int i = 0; i < testTimePartitionSlotsNum; i++) {
+ randomTimeSlotList.add(i);
+ }
+ Collections.shuffle(randomTimeSlotList);
+ for (int timeSlotId : randomTimeSlotList) {
+ // To test the shuffle strategy, we merely need to use a random time
slot order
+ ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+ database, 0, testSeriesSlotNum, timeSlotId, timeSlotId + 1,
testTimePartitionInterval);
+ }
+ TDataPartitionTableResp dataPartitionTableResp;
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp =
+ client.getDataPartitionTable(
+ new TDataPartitionReq(
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ database,
+ 0,
+ testSeriesSlotNum,
+ 0,
+ testTimePartitionSlotsNum,
+ testTimePartitionInterval)));
+ }
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>>
+ partitionTable = dataPartitionTableResp.getDataPartitionTable();
+ for (long currentStartTime = testTimePartitionInterval;
+ currentStartTime < testTimePartitionInterval *
testTimePartitionSlotsNum;
+ currentStartTime += testTimePartitionInterval) {
+ TTimePartitionSlot precedingTimeSlot =
+ new TTimePartitionSlot(currentStartTime - testTimePartitionInterval);
+ TTimePartitionSlot currentTimeSlot = new
TTimePartitionSlot(currentStartTime);
+ for (int seriesSlotId = 0; seriesSlotId < testSeriesSlotNum;
seriesSlotId++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new
TSeriesPartitionSlot(seriesSlotId);
+ List<TConsensusGroupId> precedingRegionGroupIds =
+
partitionTable.get(database).get(seriesPartitionSlot).get(precedingTimeSlot);
+ List<TConsensusGroupId> currentRegionGroupIds =
+
partitionTable.get(database).get(seriesPartitionSlot).get(currentTimeSlot);
+ Assert.assertEquals(precedingRegionGroupIds.size(),
currentRegionGroupIds.size());
+ for (int i = 0; i < precedingRegionGroupIds.size(); i++) {
+ // Ensure that the RegionGroupId is different in two adjacent
TimePartitionSlots
+ Assert.assertNotEquals(precedingRegionGroupIds.get(i),
currentRegionGroupIds.get(i));
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 868949c0b22..57e619a9baa 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -82,6 +82,8 @@ public class ConfigNodeConfig {
private String seriesPartitionExecutorClass =
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+ private String dataPartitionAllocationStrategy = "INHERIT";
+
/** The policy of extension SchemaRegionGroup for each Database. */
private RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy =
RegionGroupExtensionPolicy.AUTO;
@@ -423,6 +425,14 @@ public class ConfigNodeConfig {
this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
}
+ public String getDataPartitionAllocationStrategy() {
+ return dataPartitionAllocationStrategy;
+ }
+
+ public void setDataPartitionAllocationStrategy(String
dataPartitionAllocationStrategy) {
+ this.dataPartitionAllocationStrategy = dataPartitionAllocationStrategy;
+ }
+
public int getCnRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 0c9ccdeb928..f26ec199d8f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -182,6 +182,10 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"series_partition_executor_class",
conf.getSeriesPartitionExecutorClass()));
+ conf.setDataPartitionAllocationStrategy(
+ properties.getProperty(
+ "data_partition_allocation_strategy",
conf.getDataPartitionAllocationStrategy()));
+
conf.setConfigNodeConsensusProtocolClass(
properties.getProperty(
"config_node_consensus_protocol_class",
conf.getConfigNodeConsensusProtocolClass()));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 3db9dcf6a1d..7850cbadc49 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
@@ -39,10 +40,12 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -56,12 +59,36 @@ public class PartitionBalancer {
private final IManager configManager;
- // Map<DatabaseName, DataPartitionPolicyTable>
+ private final DataPartitionAllocationStrategy
dataPartitionAllocationStrategy;
+ // Map<DatabaseName, DataPartitionPolicyTable>, employed by INHERIT
allocation strategy
private final Map<String, DataPartitionPolicyTable>
dataPartitionPolicyTableMap;
+ private enum DataPartitionAllocationStrategy {
+ // The INHERIT strategy tries to allocate adjacent DataPartitions as
+ // consistent as possible, while ensuring load balancing.
+ INHERIT,
+ // The SHUFFLE strategy tries to allocate adjacent DataPartitions as
+ // inconsistent as possible, note the result could be unbalanced.
+ SHUFFLE
+ }
+
public PartitionBalancer(IManager configManager) {
this.configManager = configManager;
this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>();
+ switch
(ConfigNodeDescriptor.getInstance().getConf().getDataPartitionAllocationStrategy())
{
+ case "INHERIT":
+ this.dataPartitionAllocationStrategy =
DataPartitionAllocationStrategy.INHERIT;
+ break;
+ case "SHUFFLE":
+ this.dataPartitionAllocationStrategy =
DataPartitionAllocationStrategy.SHUFFLE;
+ break;
+ default:
+ LOGGER.warn(
+ "Unknown DataPartition allocation strategy {}, using INHERIT
strategy by default.",
+
ConfigNodeDescriptor.getInstance().getConf().getDataPartitionAllocationStrategy());
+ this.dataPartitionAllocationStrategy =
DataPartitionAllocationStrategy.INHERIT;
+ break;
+ }
}
/**
@@ -152,56 +179,25 @@ public class PartitionBalancer {
List<TTimePartitionSlot> timePartitionSlots =
seriesPartitionEntry.getValue().getTimePartitionSlots();
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
-
- for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
-
- // 1. The historical DataPartition will try to inherit successor
DataPartition first
- TConsensusGroupId successor =
- getPartitionManager()
- .getSuccessorDataPartition(database, seriesPartitionSlot,
timePartitionSlot);
- if (successor != null &&
availableDataRegionGroupCounter.containsKey(successor)) {
- seriesPartitionTable.putDataPartition(timePartitionSlot,
successor);
- availableDataRegionGroupCounter.put(
- successor, availableDataRegionGroupCounter.get(successor) +
1);
- continue;
- }
-
- // 2. Assign DataPartition base on the DataAllotTable
- TConsensusGroupId allotGroupId =
-
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
- if (availableDataRegionGroupCounter.containsKey(allotGroupId)) {
- seriesPartitionTable.putDataPartition(timePartitionSlot,
allotGroupId);
- availableDataRegionGroupCounter.put(
- allotGroupId,
availableDataRegionGroupCounter.get(allotGroupId) + 1);
- continue;
- }
-
- // 3. The allotDataRegionGroup is unavailable,
- // try to inherit predecessor DataPartition
- TConsensusGroupId predecessor =
- getPartitionManager()
- .getPredecessorDataPartition(database,
seriesPartitionSlot, timePartitionSlot);
- if (predecessor != null &&
availableDataRegionGroupCounter.containsKey(predecessor)) {
- seriesPartitionTable.putDataPartition(timePartitionSlot,
predecessor);
- availableDataRegionGroupCounter.put(
- predecessor,
availableDataRegionGroupCounter.get(predecessor) + 1);
- continue;
- }
-
- // 4. Assign the DataPartition to DataRegionGroup with the least
DataPartitions
- // If the above DataRegionGroups are unavailable
- TConsensusGroupId greedyGroupId =
availableDataRegionGroupCounter.getKeyWithMinValue();
- seriesPartitionTable.putDataPartition(timePartitionSlot,
greedyGroupId);
- availableDataRegionGroupCounter.put(
- greedyGroupId,
availableDataRegionGroupCounter.get(greedyGroupId) + 1);
- LOGGER.warn(
- "[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will
be allocated to DataRegionGroup: {}, because the original target: {} is
currently unavailable.",
- seriesPartitionSlot,
- timePartitionSlot,
- greedyGroupId,
- allotGroupId);
+ switch (dataPartitionAllocationStrategy) {
+ case INHERIT:
+ inheritAllocationStrategy(
+ database,
+ allotTable,
+ seriesPartitionSlot,
+ timePartitionSlots,
+ availableDataRegionGroupCounter,
+ seriesPartitionTable);
+ break;
+ case SHUFFLE:
+ shuffleAllocationStrategy(
+ database,
+ seriesPartitionSlot,
+ timePartitionSlots,
+ availableDataRegionGroupCounter,
+ seriesPartitionTable);
+ break;
}
-
dataPartitionTable
.getDataPartitionMap()
.put(seriesPartitionEntry.getKey(), seriesPartitionTable);
@@ -215,6 +211,104 @@ public class PartitionBalancer {
return result;
}
+ private void inheritAllocationStrategy(
+ String database,
+ DataPartitionPolicyTable allotTable,
+ TSeriesPartitionSlot seriesPartitionSlot,
+ List<TTimePartitionSlot> timePartitionSlots,
+ BalanceTreeMap<TConsensusGroupId, Integer>
availableDataRegionGroupCounter,
+ SeriesPartitionTable seriesPartitionTable) {
+ for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+
+ // 1. The historical DataPartition will try to inherit successor
DataPartition first
+ TConsensusGroupId successor =
+ getPartitionManager()
+ .getSuccessorDataPartition(database, seriesPartitionSlot,
timePartitionSlot);
+ if (successor != null &&
availableDataRegionGroupCounter.containsKey(successor)) {
+ seriesPartitionTable.putDataPartition(timePartitionSlot, successor);
+ availableDataRegionGroupCounter.put(
+ successor, availableDataRegionGroupCounter.get(successor) + 1);
+ continue;
+ }
+
+ // 2. Assign DataPartition base on the DataAllotTable
+ TConsensusGroupId allotGroupId =
+
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+ if (availableDataRegionGroupCounter.containsKey(allotGroupId)) {
+ seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId);
+ availableDataRegionGroupCounter.put(
+ allotGroupId, availableDataRegionGroupCounter.get(allotGroupId) +
1);
+ continue;
+ }
+
+ // 3. The allotDataRegionGroup is unavailable,
+ // try to inherit predecessor DataPartition
+ TConsensusGroupId predecessor =
+ getPartitionManager()
+ .getPredecessorDataPartition(database, seriesPartitionSlot,
timePartitionSlot);
+ if (predecessor != null &&
availableDataRegionGroupCounter.containsKey(predecessor)) {
+ seriesPartitionTable.putDataPartition(timePartitionSlot, predecessor);
+ availableDataRegionGroupCounter.put(
+ predecessor, availableDataRegionGroupCounter.get(predecessor) + 1);
+ continue;
+ }
+
+ // 4. Assign the DataPartition to DataRegionGroup with the least
DataPartitions
+ // If the above DataRegionGroups are unavailable
+ TConsensusGroupId greedyGroupId =
availableDataRegionGroupCounter.getKeyWithMinValue();
+ seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId);
+ availableDataRegionGroupCounter.put(
+ greedyGroupId, availableDataRegionGroupCounter.get(greedyGroupId) +
1);
+ LOGGER.warn(
+ "[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will be
allocated to DataRegionGroup: {}, because the original target: {} is currently
unavailable.",
+ seriesPartitionSlot,
+ timePartitionSlot,
+ greedyGroupId,
+ allotGroupId);
+ }
+ }
+
+ private void shuffleAllocationStrategy(
+ String database,
+ TSeriesPartitionSlot seriesPartitionSlot,
+ List<TTimePartitionSlot> timePartitionSlots,
+ BalanceTreeMap<TConsensusGroupId, Integer>
availableDataRegionGroupCounter,
+ SeriesPartitionTable seriesPartitionTable) {
+ final Random random = new Random();
+ List<TConsensusGroupId> availableDataRegionGroups =
+ new ArrayList<>(availableDataRegionGroupCounter.keySet());
+ for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+ if (availableDataRegionGroups.size() == 1) {
+ // Only one available DataRegionGroup
+ seriesPartitionTable.putDataPartition(
+ timePartitionSlot, availableDataRegionGroups.iterator().next());
+ continue;
+ }
+ TConsensusGroupId predecessor =
+ getPartitionManager()
+ .getPredecessorDataPartition(database, seriesPartitionSlot,
timePartitionSlot);
+ TConsensusGroupId successor =
+ getPartitionManager()
+ .getSuccessorDataPartition(database, seriesPartitionSlot,
timePartitionSlot);
+ if (predecessor != null
+ && successor != null
+ && !predecessor.equals(successor)
+ && availableDataRegionGroups.size() == 2) {
+ // Only two available DataRegionGroups and predecessor equals successor
+ seriesPartitionTable.putDataPartition(
+ timePartitionSlot, random.nextBoolean() ? successor : predecessor);
+ continue;
+ }
+ TConsensusGroupId targetGroupId;
+ do {
+ // Randomly pick a DataRegionGroup from availableDataRegionGroups
+ targetGroupId =
+
availableDataRegionGroups.get(random.nextInt(availableDataRegionGroups.size()));
+ } while (targetGroupId.equals(predecessor) ||
targetGroupId.equals(successor));
+ seriesPartitionTable.putDataPartition(timePartitionSlot, targetGroupId);
+ }
+ }
+
/**
* Re-balance the DataPartitionPolicyTable.
*