This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 81d0aa3c3f4 [To rel/1.2][IOTDB-6079] Cluster computing resource
balance (#10746)
81d0aa3c3f4 is described below
commit 81d0aa3c3f4bf9304944c8e1a72b803f78f7935b
Author: Potato <[email protected]>
AuthorDate: Mon Jul 31 23:01:33 2023 +0800
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)
---
.../iotdb/it/env/cluster/MppCommonConfig.java | 14 +-
.../it/env/cluster/MppSharedCommonConfig.java | 15 +-
.../iotdb/it/env/remote/RemoteCommonConfig.java | 12 +-
.../org/apache/iotdb/itbase/env/CommonConfig.java | 4 +-
.../it/partition/IoTDBPartitionCreationIT.java | 176 -------------
.../partition/IoTDBPartitionInheritPolicyIT.java | 276 ++++++++++++++-------
.../confignode/it/utils/ConfigNodeTestUtils.java | 117 +++++++++
.../db/it/last/IoTDBLastQueryLastCacheIT.java | 3 +-
.../apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java | 2 +-
.../session/it/IoTDBSessionSchemaTemplateIT.java | 22 +-
.../iotdb/confignode/conf/ConfigNodeConfig.java | 14 --
.../confignode/conf/ConfigNodeDescriptor.java | 6 -
.../iotdb/confignode/manager/load/LoadManager.java | 16 +-
.../manager/load/balancer/PartitionBalancer.java | 201 ++++++++++++++-
.../partition/DataPartitionPolicyTable.java | 144 +++++++++++
.../partition/GreedyPartitionAllocator.java | 203 ---------------
.../balancer/partition/IPartitionAllocator.java | 55 ----
.../manager/partition/PartitionManager.java | 90 +++++--
.../partition/DatabasePartitionTable.java | 58 +++--
.../persistence/partition/PartitionInfo.java | 72 +++++-
.../impl/schema/DeleteDatabaseProcedure.java | 3 +
.../statemachine/CreateRegionGroupsProcedure.java | 33 ++-
.../request/ConfigPhysicalPlanSerDeTest.java | 6 +-
.../partition/DataPartitionPolicyTableTest.java | 105 ++++++++
.../impl/CreateRegionGroupsProcedureTest.java | 5 +-
.../resources/conf/iotdb-common.properties | 7 -
.../commons/partition/DataPartitionTable.java | 42 +++-
.../commons/partition/SeriesPartitionTable.java | 74 +++---
.../iotdb/commons/structure/BalanceTreeMap.java | 104 ++++++++
.../commons/structure/BalanceTreeMapTest.java | 82 ++++++
30 files changed, 1273 insertions(+), 688 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index 6c01de2e787..4dbc3453fb8 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -199,14 +199,6 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
- @Override
- public CommonConfig setEnableDataPartitionInheritPolicy(
- boolean enableDataPartitionInheritPolicy) {
- setProperty(
- "enable_data_partition_inherit_policy",
String.valueOf(enableDataPartitionInheritPolicy));
- return this;
- }
-
@Override
public CommonConfig setDataReplicationFactor(int dataReplicationFactor) {
setProperty("data_replication_factor",
String.valueOf(dataReplicationFactor));
@@ -359,4 +351,10 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
setProperty("database_limit_threshold",
String.valueOf(databaseLimitThreshold));
return this;
}
+
+ @Override
+ public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+ setProperty("data_region_per_data_node",
String.valueOf(dataRegionPerDataNode));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 7af4f01fa63..b362a02d97b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -172,14 +172,6 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
- @Override
- public CommonConfig setEnableDataPartitionInheritPolicy(
- boolean enableDataPartitionInheritPolicy) {
-
cnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy);
-
dnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy);
- return this;
- }
-
@Override
public CommonConfig setSchemaRegionGroupExtensionPolicy(String
schemaRegionGroupExtensionPolicy) {
cnConfig.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy);
@@ -372,4 +364,11 @@ public class MppSharedCommonConfig implements CommonConfig
{
cnConfig.setDatabaseLimitThreshold(databaseLimitThreshold);
return this;
}
+
+ @Override
+ public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+ dnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
+ cnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index a578a6f4cf5..e181d5e1f73 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.it.env.remote;
import org.apache.iotdb.itbase.env.CommonConfig;
@@ -122,12 +123,6 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
- @Override
- public CommonConfig setEnableDataPartitionInheritPolicy(
- boolean enableDataPartitionInheritPolicy) {
- return this;
- }
-
@Override
public CommonConfig setSchemaRegionGroupExtensionPolicy(String
schemaRegionGroupExtensionPolicy) {
return this;
@@ -266,4 +261,9 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setDatabaseLimitThreshold(long databaseLimitThreshold) {
return this;
}
+
+ @Override
+ public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 2a08304221d..08161c36bc3 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -62,8 +62,6 @@ public interface CommonConfig {
CommonConfig setDataRegionConsensusProtocolClass(String
dataRegionConsensusProtocolClass);
- CommonConfig setEnableDataPartitionInheritPolicy(boolean
enableDataPartitionInheritPolicy);
-
CommonConfig setSchemaRegionGroupExtensionPolicy(String
schemaRegionGroupExtensionPolicy);
CommonConfig setDefaultSchemaRegionGroupNumPerDatabase(int
schemaRegionGroupPerDatabase);
@@ -119,4 +117,6 @@ public interface CommonConfig {
CommonConfig setSortBufferSize(long sortBufferSize);
CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte);
+
+ CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
index 33844dcf48c..dba7b812814 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
@@ -419,182 +419,6 @@ public class IoTDBPartitionCreationIT {
4 + testTimePartitionBatchSize,
testTimePartitionInterval,
dataPartitionTableResp.getDataPartitionTable());
-
- // RegionGroup statistics:
- // 0: 1 Removing, 1 partition
- // 1: 1 ReadOnly, 1 partition
- // 2: 1 Unknown, 1 partition
- // 3: All Running, 1 partition
- // Least Region Group number per storageGroup = 4, match the current
Region Group number
- // Will allocate the new partition to Running RegionGroup 3, DataNodes:
[1, 2, 6]
- showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (regionInfo.getDataNodeId() == 6) {
- Assert.assertEquals(regionInfo.getTimeSlots(), 2);
- }
- }
-
- partitionSlotsMap =
- ConfigNodeTestUtils.constructPartitionSlotsMap(
- sg,
- 5,
- 5 + testSeriesPartitionBatchSize,
- 5,
- 5 + testTimePartitionBatchSize,
- testTimePartitionInterval);
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- ConfigNodeTestUtils.checkDataPartitionTable(
- sg,
- 5,
- 5 + testSeriesPartitionBatchSize,
- 5,
- 5 + testTimePartitionBatchSize,
- testTimePartitionInterval,
- dataPartitionTableResp.getDataPartitionTable());
-
- // RegionGroup statistics:
- // 0: 1 Removing, 1 partition
- // 1: 1 ReadOnly, 1 partition
- // 2: 1 Unknown, 1 partition
- // 3: All Running, 2 partition
- // Least Region Group number per storageGroup = 4, match the current
Region Group number
- // Will allocate the new partition to available RegionGroup 2,
DataNodes: [1, 2, 5]
- showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (regionInfo.getDataNodeId() == 5) {
- Assert.assertEquals(regionInfo.getTimeSlots(), 2);
- }
- }
-
- partitionSlotsMap =
- ConfigNodeTestUtils.constructPartitionSlotsMap(
- sg,
- 6,
- 6 + testSeriesPartitionBatchSize,
- 6,
- 6 + testTimePartitionBatchSize,
- testTimePartitionInterval);
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- ConfigNodeTestUtils.checkDataPartitionTable(
- sg,
- 6,
- 6 + testSeriesPartitionBatchSize,
- 6,
- 6 + testTimePartitionBatchSize,
- testTimePartitionInterval,
- dataPartitionTableResp.getDataPartitionTable());
-
- // RegionGroup statistics:
- // 0: 1 Removing, 1 partition
- // 1: 1 ReadOnly, 1 partition
- // 2: 1 Unknown, 2 partition
- // 3: All Running, 2 partition
- // Least Region Group number per storageGroup = 4, match the current
Region Group number
- // Will allocate the new partition to Discouraged RegionGroup 2,
DataNodes: [1, 2, 4]
- showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (regionInfo.getDataNodeId() == 4) {
- Assert.assertEquals(regionInfo.getTimeSlots(), 2);
- }
- }
-
- partitionSlotsMap =
- ConfigNodeTestUtils.constructPartitionSlotsMap(
- sg,
- 7,
- 7 + testSeriesPartitionBatchSize,
- 7,
- 7 + testTimePartitionBatchSize,
- testTimePartitionInterval);
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- ConfigNodeTestUtils.checkDataPartitionTable(
- sg,
- 7,
- 7 + testSeriesPartitionBatchSize,
- 7,
- 7 + testTimePartitionBatchSize,
- testTimePartitionInterval,
- dataPartitionTableResp.getDataPartitionTable());
-
- // RegionGroup statistics:
- // 0: 1 Removing, 1 partition
- // 1: 1 ReadOnly, 2 partition
- // 2: 1 Unknown, 2 partition
- // 3: All Running, 2 partition
- // Least Region Group number per storageGroup = 4, match the current
Region Group number
- // Will allocate the new partition to Running RegionGroup 3, DataNodes:
[0, 1, 5]
- // Because RegionGroup 1 is Disabled
- showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (regionInfo.getDataNodeId() == 6) {
- Assert.assertEquals(regionInfo.getTimeSlots(), 3);
- }
- }
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index 8b49948f5f7..2efd5db7a0b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.it.partition;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -40,27 +39,27 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.HashSet;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBPartitionInheritPolicyIT {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBPartitionInheritPolicyIT.class);
- private static final boolean testEnableDataPartitionInheritPolicy = true;
private static final String testDataRegionConsensusProtocolClass =
ConsensusFactory.RATIS_CONSENSUS;
- private static final int testReplicationFactor = 3;
+ private static final int testReplicationFactor = 1;
+ private static final int testSeriesSlotNum = 1000;
private static final long testTimePartitionInterval = 604800000;
+ private static final double testDataRegionPerDataNode = 5.0;
- private static final String sg = "root.sg";
- private static final int storageGroupNum = 2;
- private static final int testSeriesPartitionSlotNum = 100;
- private static final int seriesPartitionBatchSize = 10;
+ private static final String database = "root.database";
+ private static final int seriesPartitionSlotBatchSize = 100;
private static final int testTimePartitionSlotsNum = 100;
private static final int timePartitionBatchSize = 10;
@@ -70,21 +69,19 @@ public class IoTDBPartitionInheritPolicyIT {
.getConfig()
.getCommonConfig()
.setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass)
-
.setEnableDataPartitionInheritPolicy(testEnableDataPartitionInheritPolicy)
.setDataReplicationFactor(testReplicationFactor)
.setTimePartitionInterval(testTimePartitionInterval)
- .setSeriesSlotNum(testSeriesPartitionSlotNum * 10);
+ .setSeriesSlotNum(testSeriesSlotNum)
+ .setDataRegionPerDataNode(testDataRegionPerDataNode);
- // Init 1C3D environment
- EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ // Init 1C1D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 1);
- // Set StorageGroups
+ // Set Database
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- for (int i = 0; i < storageGroupNum; i++) {
- TSStatus status = client.setDatabase(new TDatabaseSchema(sg + i));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- }
+ TSStatus status = client.setDatabase(new TDatabaseSchema(database));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
}
}
@@ -95,76 +92,177 @@ public class IoTDBPartitionInheritPolicyIT {
@Test
public void testDataPartitionInheritPolicy() throws Exception {
+ final long baseStartTime = 1000;
+ Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new
ConcurrentHashMap<>();
- try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
- TDataPartitionTableResp dataPartitionTableResp;
- Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap;
-
- for (int i = 0; i < storageGroupNum; i++) {
- String storageGroup = sg + i;
- for (int j = 0; j < testSeriesPartitionSlotNum; j +=
seriesPartitionBatchSize) {
- // Test inherit predecessor or successor
- boolean isAscending = (j / 10) % 2 == 0;
- int step = isAscending ? timePartitionBatchSize :
-timePartitionBatchSize;
- int k = isAscending ? 0 : testTimePartitionSlotsNum -
timePartitionBatchSize;
- while (0 <= k && k < testTimePartitionSlotsNum) {
- partitionSlotsMap =
- ConfigNodeTestUtils.constructPartitionSlotsMap(
- storageGroup,
- j,
- j + seriesPartitionBatchSize,
- k,
- k + timePartitionBatchSize,
- testTimePartitionInterval);
- // Let ConfigNode create DataPartition
- dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient)
- EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp =
-
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null
- && dataPartitionTableResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- ConfigNodeTestUtils.checkDataPartitionTable(
- storageGroup,
- j,
- j + seriesPartitionBatchSize,
- k,
- k + timePartitionBatchSize,
- testTimePartitionInterval,
- configNodeClient
- .getDataPartitionTable(dataPartitionReq)
- .getDataPartitionTable());
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
+ // Test1: divide and inherit DataPartitions from scratch
+ for (long timePartitionSlot = baseStartTime;
+ timePartitionSlot < baseStartTime + testTimePartitionSlotsNum;
+ timePartitionSlot++) {
+ for (int seriesPartitionSlot = 0;
+ seriesPartitionSlot < testSeriesSlotNum;
+ seriesPartitionSlot += seriesPartitionSlotBatchSize) {
+ ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+ database,
+ seriesPartitionSlot,
+ seriesPartitionSlot + seriesPartitionSlotBatchSize,
+ timePartitionSlot,
+ timePartitionSlot + 1,
+ testTimePartitionInterval);
+ }
+ }
+
+ int mu = (int) (testSeriesSlotNum / testDataRegionPerDataNode);
+ TDataPartitionTableResp dataPartitionTableResp =
+ ConfigNodeTestUtils.getDataPartitionWithRetry(
+ database,
+ 0,
+ testSeriesSlotNum,
+ baseStartTime,
+ baseStartTime + testTimePartitionSlotsNum,
+ testTimePartitionInterval);
+ Assert.assertNotNull(dataPartitionTableResp);
+
+ // All DataRegionGroups divide all SeriesSlots evenly
+ final int expectedPartitionNum1 = mu * testTimePartitionSlotsNum;
+ Map<TConsensusGroupId, Integer> counter =
+ ConfigNodeTestUtils.countDataPartition(
+ dataPartitionTableResp.getDataPartitionTable().get(database));
+ counter.forEach((groupId, num) ->
Assert.assertEquals(expectedPartitionNum1, num.intValue()));
+
+ // Test DataPartition inherit policy
+ dataPartitionTableResp
+ .getDataPartitionTable()
+ .get(database)
+ .forEach(
+ ((seriesPartitionSlot, timePartitionSlotMap) -> {
+ // All Timeslots belonging to the same SeriesSlot are allocated
to the same
+ // DataRegionGroup
+ TConsensusGroupId groupId =
+ timePartitionSlotMap
+ .get(new TTimePartitionSlot(baseStartTime *
testTimePartitionInterval))
+ .get(0);
+ timePartitionSlotMap.forEach(
+ (timePartitionSlot, groupIdList) ->
+ Assert.assertEquals(groupId, groupIdList.get(0)));
+ dataAllotTable1.put(seriesPartitionSlot, groupId);
+ }));
+
+ // Register a new DataNode to extend DataRegionGroups
+ EnvFactory.getEnv().registerNewDataNode(true);
+
+ // Test2: divide and inherit DataPartitions after extension
+ mu = (int) (testSeriesSlotNum / (testDataRegionPerDataNode * 2));
+ dataPartitionTableResp =
+ ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+ database,
+ 0,
+ testSeriesSlotNum,
+ baseStartTime + testTimePartitionSlotsNum,
+ baseStartTime + testTimePartitionSlotsNum + timePartitionBatchSize,
+ testTimePartitionInterval);
+ Assert.assertNotNull(dataPartitionTableResp);
+
+ // All DataRegionGroups divide all SeriesSlots evenly
+ counter =
+ ConfigNodeTestUtils.countDataPartition(
+ dataPartitionTableResp.getDataPartitionTable().get(database));
+ final int expectedPartitionNum2 = mu * timePartitionBatchSize;
+ counter.forEach((groupId, num) ->
Assert.assertEquals(expectedPartitionNum2, num.intValue()));
+
+ // Test DataPartition inherit policy
+ AtomicInteger inheritedSeriesSlotNum = new AtomicInteger(0);
+ Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable2 = new
ConcurrentHashMap<>();
+ dataPartitionTableResp
+ .getDataPartitionTable()
+ .get(database)
+ .forEach(
+ ((seriesPartitionSlot, timePartitionSlotMap) -> {
+ // All Timeslots belonging to the same SeriesSlot are allocated
to the same
+ // DataRegionGroup
+ TConsensusGroupId groupId =
+ timePartitionSlotMap
+ .get(
+ new TTimePartitionSlot(
+ (baseStartTime + testTimePartitionSlotsNum)
+ * testTimePartitionInterval))
+ .get(0);
+ timePartitionSlotMap.forEach(
+ (timePartitionSlot, groupIdList) ->
+ Assert.assertEquals(groupId, groupIdList.get(0)));
+
+ if (dataAllotTable1.containsValue(groupId)) {
+ // The DataRegionGroup has been inherited
+
Assert.assertTrue(dataAllotTable1.containsKey(seriesPartitionSlot));
+ Assert.assertEquals(dataAllotTable1.get(seriesPartitionSlot),
groupId);
+ inheritedSeriesSlotNum.incrementAndGet();
}
- }
- k += step;
- }
- }
+ dataAllotTable2.put(seriesPartitionSlot, groupId);
+ }));
+ // Exactly half of the SeriesSlots are inherited
+ Assert.assertEquals(testSeriesSlotNum / 2, inheritedSeriesSlotNum.get());
+
+ // Test3: historical DataPartitions will inherit successor
+ Random random = new Random();
+ Set<Integer> allocatedSlots = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ int slot = random.nextInt(testSeriesSlotNum);
+ while (allocatedSlots.contains(slot)) {
+ slot = random.nextInt(testSeriesSlotNum);
+ }
+ allocatedSlots.add(slot);
+ dataPartitionTableResp =
+ ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+ database,
+ slot,
+ slot + 1,
+ baseStartTime - 1,
+ baseStartTime,
+ testTimePartitionInterval);
+ Assert.assertNotNull(dataPartitionTableResp);
+
+ TSeriesPartitionSlot seriesPartitionSlot = new
TSeriesPartitionSlot(slot);
+ TTimePartitionSlot timePartitionSlot =
+ new TTimePartitionSlot((baseStartTime - 1) *
testTimePartitionInterval);
+ Assert.assertEquals(
+ dataAllotTable1.get(seriesPartitionSlot),
+ dataPartitionTableResp
+ .getDataPartitionTable()
+ .get(database)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .get(0));
+ }
+
+ // Test4: future DataPartitions will inherit predecessor
+ allocatedSlots.clear();
+ for (int i = 0; i < 10; i++) {
+ int slot = random.nextInt(testSeriesSlotNum);
+ while (allocatedSlots.contains(slot)) {
+ slot = random.nextInt(testSeriesSlotNum);
}
+ allocatedSlots.add(slot);
+ dataPartitionTableResp =
+ ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+ database,
+ slot,
+ slot + 1,
+ baseStartTime + 999,
+ baseStartTime + 1000,
+ testTimePartitionInterval);
+ Assert.assertNotNull(dataPartitionTableResp);
- // Test DataPartition inherit policy
- TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
- showRegionResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- // All Timeslots belonging to the same SeriesSlot are
allocated to the same
- // DataRegionGroup
- Assert.assertEquals(
- regionInfo.getSeriesSlots() * testTimePartitionSlotsNum,
- regionInfo.getTimeSlots());
- });
+ TSeriesPartitionSlot seriesPartitionSlot = new
TSeriesPartitionSlot(slot);
+ TTimePartitionSlot timePartitionSlot =
+ new TTimePartitionSlot((baseStartTime + 999) *
testTimePartitionInterval);
+ Assert.assertEquals(
+ dataAllotTable2.get(seriesPartitionSlot),
+ dataPartitionTableResp
+ .getDataPartitionTable()
+ .get(database)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .get(0));
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 6f20a25ba79..8f5b46288fb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
@@ -34,12 +35,18 @@ import
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -47,11 +54,17 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
public class ConfigNodeTestUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigNodeTestUtils.class);
+
public static void checkNodeConfig(
List<TConfigNodeLocation> configNodeList,
List<TDataNodeLocation> dataNodeList,
@@ -169,6 +182,110 @@ public class ConfigNodeTestUtils {
}
}
+ public static TDataPartitionTableResp getDataPartitionWithRetry(
+ String database,
+ int seriesSlotStart,
+ int seriesSlotEnd,
+ long timeSlotStart,
+ long timeSlotEnd,
+ long timePartitionInterval)
+ throws InterruptedException {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ database,
+ seriesSlotStart,
+ seriesSlotEnd,
+ timeSlotStart,
+ timeSlotEnd,
+ timePartitionInterval);
+ TDataPartitionTableResp dataPartitionTableResp;
+ TDataPartitionReq dataPartitionReq = new
TDataPartitionReq(partitionSlotsMap);
+
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp =
configNodeClient.getDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null
+ && dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return dataPartitionTableResp;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.fail("Failed to create DataPartition");
+ return null;
+ }
+
+ public static TDataPartitionTableResp getOrCreateDataPartitionWithRetry(
+ String database,
+ int seriesSlotStart,
+ int seriesSlotEnd,
+ long timeSlotStart,
+ long timeSlotEnd,
+ long timePartitionInterval)
+ throws InterruptedException {
+
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ database,
+ seriesSlotStart,
+ seriesSlotEnd,
+ timeSlotStart,
+ timeSlotEnd,
+ timePartitionInterval);
+ TDataPartitionTableResp dataPartitionTableResp;
+ TDataPartitionReq dataPartitionReq = new
TDataPartitionReq(partitionSlotsMap);
+
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null
+ && dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ database,
+ seriesSlotStart,
+ seriesSlotEnd,
+ timeSlotStart,
+ timeSlotEnd,
+ timePartitionInterval,
+
configNodeClient.getDataPartitionTable(dataPartitionReq).getDataPartitionTable());
+ return dataPartitionTableResp;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.fail("Failed to create DataPartition");
+ return null;
+ }
+
+ public static Map<TConsensusGroupId, Integer> countDataPartition(
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>
+ dataPartitionMap) {
+ Map<TConsensusGroupId, AtomicInteger> counter = new ConcurrentHashMap<>();
+ dataPartitionMap.forEach(
+ ((seriesPartitionSlot, timePartitionSlotMap) ->
+ timePartitionSlotMap.forEach(
+ ((timePartitionSlot, consensusGroupIds) ->
+ consensusGroupIds.forEach(
+ (consensusGroupId ->
+ counter
+ .computeIfAbsent(consensusGroupId, empty ->
new AtomicInteger(0))
+ .incrementAndGet()))))));
+ return counter.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry ->
entry.getValue().get()));
+ }
+
public static TConfigNodeLocation generateTConfigNodeLocation(
int nodeId, ConfigNodeWrapper configNodeWrapper) {
return new TConfigNodeLocation(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
index 33f9ef9e151..6d30db00085 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
@@ -119,7 +119,8 @@ public class IoTDBLastQueryLastCacheIT {
"1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,",
"1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
};
- resultSetEqualTest("select last * from root.ln_1.tb_6141;",
expectedHeader, retArray);
+ resultSetEqualTest(
+ "select last * from root.ln_1.tb_6141 order by timeseries;",
expectedHeader, retArray);
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
index 454879aea90..82db0fdf154 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
@@ -1318,7 +1318,7 @@ public class IoTDBOrderByIT {
{"15", "15", "3147483648", "3147483648"},
{"INT32", "INT32", "INT64", "INT64"}
};
- String sql = "select last bigNum,num from root.** order by value";
+ String sql = "select last bigNum,num from root.** order by value,
timeseries";
testLastQueryOrderBy(sql, ans);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
index b71005bc0d6..66cd2d6905b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
@@ -471,18 +471,22 @@ public class IoTDBSessionSchemaTemplateIT extends
AbstractSchemaIT {
Assert.assertTrue(expectedSeries.isEmpty());
- deviceIds = Arrays.asList("root.db.v4.d1", "root.db.v4.d2");
- timestamps = Arrays.asList(1L, 1L);
measurements = Arrays.asList("a", "b", "c");
- allMeasurements = Arrays.asList(measurements, measurements);
- allTsDataTypes =
- Arrays.asList(
- Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.TEXT),
- Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE,
TSDataType.INT32));
- allValues = Arrays.asList(Arrays.asList(1f, 2f, "3"), Arrays.asList(1d,
2d, 3));
try {
- session.insertRecords(deviceIds, timestamps, allMeasurements,
allTsDataTypes, allValues);
+ session.insertRecord(
+ "root.db.v4.d1",
+ 1L,
+ measurements,
+ Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.TEXT),
+ Arrays.asList(1f, 2f, "3"));
+ session.insertRecord(
+ "root.db.v4.d2",
+ 1L,
+ measurements,
+ Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE,
TSDataType.INT32),
+ Arrays.asList(1d, 2d, 3));
+ Assert.fail();
} catch (StatementExecutionException e) {
Assert.assertTrue(
e.getMessage()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 2907c3b1eff..6b99e0da369 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -110,12 +110,6 @@ public class ConfigNodeConfig {
private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy =
RegionBalancer.RegionGroupAllocatePolicy.GREEDY;
- /**
- * DataPartition within the same SeriesPartitionSlot will inherit the
allocation result of the
- * predecessor or successor TimePartitionSlot if set true.
- */
- private boolean enableDataPartitionInheritPolicy = true;
-
/** Max concurrent client number. */
private int rpcMaxConcurrentClientNum = 65535;
@@ -559,14 +553,6 @@ public class ConfigNodeConfig {
this.regionGroupAllocatePolicy = regionGroupAllocatePolicy;
}
- public boolean isEnableDataPartitionInheritPolicy() {
- return enableDataPartitionInheritPolicy;
- }
-
- public void setEnableDataPartitionInheritPolicy(boolean
enableDataPartitionInheritPolicy) {
- this.enableDataPartitionInheritPolicy = enableDataPartitionInheritPolicy;
- }
-
public int getThriftServerAwaitTimeForStopService() {
return thriftServerAwaitTimeForStopService;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 59eab23ddc4..5490b700c36 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -272,12 +272,6 @@ public class ConfigNodeDescriptor {
"The configured region allocate strategy does not exist, use the
default: GREEDY!");
}
- conf.setEnableDataPartitionInheritPolicy(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_data_partition_inherit_policy",
- String.valueOf(conf.isEnableDataPartitionInheritPolicy()))));
-
conf.setCnRpcAdvancedCompressionEnable(
Boolean.parseBoolean(
properties
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index d88ebd70fe4..b4c291c97e9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -94,7 +94,7 @@ public class LoadManager {
/**
* Generate an optimal CreateRegionGroupsPlan.
*
- * @param allotmentMap Map<StorageGroupName, Region allotment>
+ * @param allotmentMap Map<DatabaseName, Region allotment>
* @param consensusGroupType TConsensusGroupType of RegionGroup to be
allocated
* @return CreateRegionGroupsPlan
* @throws NotEnoughDataNodeException If there are not enough DataNodes
@@ -110,7 +110,7 @@ public class LoadManager {
* Allocate SchemaPartitions.
*
* @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should
be assigned
- * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
+ * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
*/
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
Map<String, List<TSeriesPartitionSlot>>
unassignedSchemaPartitionSlotsMap)
@@ -122,7 +122,7 @@ public class LoadManager {
* Allocate DataPartitions.
*
* @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be
assigned
- * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
+ * @return Map<DatabaseName, DataPartitionTable>, the allocating result
*/
public Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
unassignedDataPartitionSlotsMap)
@@ -130,6 +130,10 @@ public class LoadManager {
return
partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
}
+ public void reBalanceDataPartitionPolicy(String database) {
+ partitionBalancer.reBalanceDataPartitionPolicy(database);
+ }
+
public void broadcastLatestRegionRouteMap() {
statisticsService.broadcastLatestRegionRouteMap();
}
@@ -138,12 +142,18 @@ public class LoadManager {
loadCache.initHeartbeatCache(configManager);
heartbeatService.startHeartbeatService();
statisticsService.startLoadStatisticsService();
+ partitionBalancer.setupPartitionBalancer();
}
public void stopLoadServices() {
heartbeatService.stopHeartbeatService();
statisticsService.stopLoadStatisticsService();
loadCache.clearHeartbeatCache();
+ partitionBalancer.clearPartitionBalancer();
+ }
+
+ public void clearDataPartitionPolicyTable(String database) {
+ partitionBalancer.clearDataPartitionPolicyTable(database);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 89d9235911c..6bfa9ae0671 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -16,19 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
-import
org.apache.iotdb.confignode.manager.load.balancer.partition.GreedyPartitionAllocator;
-import
org.apache.iotdb.confignode.manager.load.balancer.partition.IPartitionAllocator;
+import
org.apache.iotdb.confignode.manager.load.balancer.partition.DataPartitionPolicyTable;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* The SeriesPartitionSlotBalancer provides interfaces to generate optimal
Partition allocation and
@@ -36,38 +51,206 @@ import java.util.Map;
*/
public class PartitionBalancer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionBalancer.class);
+
private final IManager configManager;
+ // Map<DatabaseName, DataPartitionPolicyTable>
+ private final Map<String, DataPartitionPolicyTable>
dataPartitionPolicyTableMap;
+
public PartitionBalancer(IManager configManager) {
this.configManager = configManager;
+ this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>();
}
/**
* Allocate SchemaPartitions
*
* @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should
be assigned
- * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
+ * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
*/
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
Map<String, List<TSeriesPartitionSlot>>
unassignedSchemaPartitionSlotsMap)
throws NoAvailableRegionGroupException {
- return
genPartitionAllocator().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+ Map<String, SchemaPartitionTable> result = new HashMap<>();
+
+ for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
+ unassignedSchemaPartitionSlotsMap.entrySet()) {
+ final String database = slotsMapEntry.getKey();
+ final List<TSeriesPartitionSlot> unassignedPartitionSlots =
slotsMapEntry.getValue();
+
+ // Filter available SchemaRegionGroups and
+ // sort them by the number of allocated SchemaPartitions
+ BalanceTreeMap<TConsensusGroupId, Integer> counter = new
BalanceTreeMap<>();
+ List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+ getPartitionManager()
+ .getSortedRegionGroupSlotsCounter(database,
TConsensusGroupType.SchemaRegion);
+ for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
+ counter.put(pair.getRight(), pair.getLeft().intValue());
+ }
+
+ // Enumerate SeriesPartitionSlot
+ Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new
HashMap<>();
+ for (TSeriesPartitionSlot seriesPartitionSlot :
unassignedPartitionSlots) {
+ // Greedy allocation: allocate the unassigned SchemaPartition to
+ // the RegionGroup whose allocated SchemaPartitions is the least
+ TConsensusGroupId consensusGroupId = counter.getKeyWithMinValue();
+ schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId);
+ counter.put(consensusGroupId, counter.get(consensusGroupId) + 1);
+ }
+ result.put(database, new SchemaPartitionTable(schemaPartitionMap));
+ }
+
+ return result;
}
/**
* Allocate DataPartitions
*
* @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be
assigned
- * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
+ * @return Map<DatabaseName, DataPartitionTable>, the allocating result
*/
public Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
unassignedDataPartitionSlotsMap)
throws NoAvailableRegionGroupException {
- return
genPartitionAllocator().allocateDataPartition(unassignedDataPartitionSlotsMap);
+ Map<String, DataPartitionTable> result = new HashMap<>();
+
+ for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
slotsMapEntry :
+ unassignedDataPartitionSlotsMap.entrySet()) {
+ final String database = slotsMapEntry.getKey();
+ final Map<TSeriesPartitionSlot, TTimeSlotList>
unassignedPartitionSlotsMap =
+ slotsMapEntry.getValue();
+
+ // Filter available DataRegionGroups and
+ // sort them by the number of allocated DataPartitions
+ BalanceTreeMap<TConsensusGroupId, Integer> counter = new
BalanceTreeMap<>();
+ List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+ getPartitionManager()
+ .getSortedRegionGroupSlotsCounter(database,
TConsensusGroupType.DataRegion);
+ for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
+ counter.put(pair.getRight(), pair.getLeft().intValue());
+ }
+
+ DataPartitionTable dataPartitionTable = new DataPartitionTable();
+ DataPartitionPolicyTable allotTable =
dataPartitionPolicyTableMap.get(database);
+ try {
+ allotTable.acquireLock();
+ // Enumerate SeriesPartitionSlot
+ for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList>
seriesPartitionEntry :
+ unassignedPartitionSlotsMap.entrySet()) {
+ SeriesPartitionTable seriesPartitionTable = new
SeriesPartitionTable();
+
+ // Enumerate TimePartitionSlot in ascending order
+ TSeriesPartitionSlot seriesPartitionSlot =
seriesPartitionEntry.getKey();
+ List<TTimePartitionSlot> timePartitionSlots =
+ seriesPartitionEntry.getValue().getTimePartitionSlots();
+
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
+
+ for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+
+ // 1. The historical DataPartition will try to inherit successor
DataPartition first
+ TConsensusGroupId successor =
+ getPartitionManager()
+ .getSuccessorDataPartition(database, seriesPartitionSlot,
timePartitionSlot);
+ if (successor != null && counter.containsKey(successor)) {
+ seriesPartitionTable.putDataPartition(timePartitionSlot,
successor);
+ counter.put(successor, counter.get(successor) + 1);
+ continue;
+ }
+
+ // 2. Assign DataPartition base on the DataAllotTable
+ TConsensusGroupId allotGroupId =
+
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+ if (counter.containsKey(allotGroupId)) {
+ seriesPartitionTable.putDataPartition(timePartitionSlot,
allotGroupId);
+ counter.put(allotGroupId, counter.get(allotGroupId) + 1);
+ continue;
+ }
+
+ // 3. The allotDataRegionGroup is unavailable,
+ // try to inherit predecessor DataPartition
+ TConsensusGroupId predecessor =
+ getPartitionManager()
+ .getPredecessorDataPartition(database,
seriesPartitionSlot, timePartitionSlot);
+ if (predecessor != null && counter.containsKey(predecessor)) {
+ seriesPartitionTable.putDataPartition(timePartitionSlot,
predecessor);
+ counter.put(predecessor, counter.get(predecessor) + 1);
+ continue;
+ }
+
+ // 4. Assign the DataPartition to DataRegionGroup with the least
DataPartitions
+ // If the above DataRegionGroups are unavailable
+ TConsensusGroupId greedyGroupId = counter.getKeyWithMinValue();
+ seriesPartitionTable.putDataPartition(timePartitionSlot,
greedyGroupId);
+ counter.put(greedyGroupId, counter.get(greedyGroupId) + 1);
+ }
+
+ dataPartitionTable
+ .getDataPartitionMap()
+ .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
+ }
+ } finally {
+ allotTable.releaseLock();
+ }
+ result.put(database, dataPartitionTable);
+ }
+
+ return result;
+ }
+
+ /**
+ * Re-balance the DataPartitionPolicyTable.
+ *
+ * @param database Database name
+ */
+ public void reBalanceDataPartitionPolicy(String database) {
+ try {
+ dataPartitionPolicyTableMap
+ .computeIfAbsent(database, empty -> new DataPartitionPolicyTable())
+ .reBalanceDataPartitionPolicy(
+ getPartitionManager().getAllRegionGroupIds(database,
TConsensusGroupType.DataRegion));
+ } catch (DatabaseNotExistsException e) {
+ LOGGER.error("Database {} not exists when updateDataAllotTable",
database);
+ }
+ }
+
+ /** Set up the PartitionBalancer when the current ConfigNode becomes leader.
*/
+ public void setupPartitionBalancer() {
+ dataPartitionPolicyTableMap.clear();
+ getClusterSchemaManager()
+ .getDatabaseNames()
+ .forEach(
+ database -> {
+ DataPartitionPolicyTable dataPartitionPolicyTable = new
DataPartitionPolicyTable();
+ dataPartitionPolicyTableMap.put(database,
dataPartitionPolicyTable);
+ try {
+ // Put all DataRegionGroups into the DataPartitionPolicyTable
+ dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
+ getPartitionManager()
+ .getAllRegionGroupIds(database,
TConsensusGroupType.DataRegion));
+ // Load the last DataAllotTable
+ dataPartitionPolicyTable.setDataAllotMap(
+ getPartitionManager().getLastDataAllotTable(database));
+ } catch (DatabaseNotExistsException e) {
+ LOGGER.error("Database {} not exists when
setupPartitionBalancer", database);
+ }
+ });
+ }
+
+ /** Clear the PartitionBalancer when the current ConfigNode is no longer the
leader. */
+ public void clearPartitionBalancer() {
+ dataPartitionPolicyTableMap.clear();
+ }
+
+ public void clearDataPartitionPolicyTable(String database) {
+ dataPartitionPolicyTableMap.remove(database);
+ }
+
+ private ClusterSchemaManager getClusterSchemaManager() {
+ return configManager.getClusterSchemaManager();
}
- private IPartitionAllocator genPartitionAllocator() {
- // TODO: The type of PartitionAllocator should be configurable
- return new GreedyPartitionAllocator(configManager);
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
new file mode 100644
index 00000000000..29bd2a2c1ba
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DataPartitionPolicyTable {
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+ private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+ private final ReentrantLock dataAllotTableLock;
+
+ // Map<SeriesPartitionSlot, RegionGroupId>
+ // The optimal allocation of SeriesSlots to RegionGroups
+ private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap;
+
+ // Map<RegionGroupId, SeriesPartitionSlot Count>
+ // The number of SeriesSlots allocated to each RegionGroup in dataAllotMap
+ private final BalanceTreeMap<TConsensusGroupId, Integer>
seriesPartitionSlotCounter;
+
+ public DataPartitionPolicyTable() {
+ this.dataAllotTableLock = new ReentrantLock();
+ this.dataAllotMap = new HashMap<>();
+ this.seriesPartitionSlotCounter = new BalanceTreeMap<>();
+ }
+
+ /**
+ * Get or activate the specified SeriesPartitionSlot in dataAllotMap.
+ *
+ * @param seriesPartitionSlot The specified SeriesPartitionSlot
+ * @return The RegionGroupId of the specified SeriesPartitionSlot, activate
when its empty yet
+ */
+ public TConsensusGroupId getRegionGroupIdOrActivateIfNecessary(
+ TSeriesPartitionSlot seriesPartitionSlot) {
+ if (dataAllotMap.containsKey(seriesPartitionSlot)) {
+ return dataAllotMap.get(seriesPartitionSlot);
+ }
+
+ TConsensusGroupId regionGroupId =
seriesPartitionSlotCounter.getKeyWithMinValue();
+ dataAllotMap.put(seriesPartitionSlot, regionGroupId);
+ seriesPartitionSlotCounter.put(
+ regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1);
+ return regionGroupId;
+ }
+
+ /**
+ * Re-balance the allocation of SeriesSlots to RegionGroups.
+ *
+ * @param dataRegionGroups All DataRegionGroups currently in the Database
+ */
+ public void reBalanceDataPartitionPolicy(List<TConsensusGroupId>
dataRegionGroups) {
+ dataAllotTableLock.lock();
+ try {
+ dataRegionGroups.forEach(
+ dataRegionGroup -> {
+ if (!seriesPartitionSlotCounter.containsKey(dataRegionGroup)) {
+ seriesPartitionSlotCounter.put(dataRegionGroup, 0);
+ }
+ });
+
+ // Enumerate all SeriesSlots randomly
+ List<TSeriesPartitionSlot> seriesPartitionSlots = new ArrayList<>();
+ for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+ seriesPartitionSlots.add(new TSeriesPartitionSlot(i));
+ }
+ Collections.shuffle(seriesPartitionSlots);
+
+ int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
+ for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) {
+ if (!dataAllotMap.containsKey(seriesPartitionSlot)) {
+ continue;
+ }
+
+ TConsensusGroupId regionGroupId =
dataAllotMap.get(seriesPartitionSlot);
+ int seriesPartitionSlotCount =
seriesPartitionSlotCounter.get(regionGroupId);
+ if (seriesPartitionSlotCount > mu) {
+ // Remove from dataAllotMap if the number of SeriesSlots is greater
than mu
+ dataAllotMap.remove(seriesPartitionSlot);
+ seriesPartitionSlotCounter.put(regionGroupId,
seriesPartitionSlotCount - 1);
+ }
+ }
+ } finally {
+ dataAllotTableLock.unlock();
+ }
+ }
+
+ /** Only use this interface when init PartitionBalancer. */
+ public void setDataAllotMap(Map<TSeriesPartitionSlot, TConsensusGroupId>
dataAllotMap) {
+ try {
+ dataAllotTableLock.lock();
+ int mu = SERIES_SLOT_NUM / seriesPartitionSlotCounter.size();
+ dataAllotMap.forEach(
+ (seriesPartitionSlot, regionGroupId) -> {
+ if (seriesPartitionSlotCounter.get(regionGroupId) < mu) {
+ // Put into dataAllotMap only when the number of SeriesSlots
+ // allocated to the RegionGroup is less than mu
+ this.dataAllotMap.put(seriesPartitionSlot, regionGroupId);
+ seriesPartitionSlotCounter.put(
+ regionGroupId, seriesPartitionSlotCounter.get(regionGroupId)
+ 1);
+ }
+ // Otherwise, clear this SeriesPartitionSlot and wait for
re-activating
+ });
+ } finally {
+ dataAllotTableLock.unlock();
+ }
+ }
+
+ public void acquireLock() {
+ dataAllotTableLock.lock();
+ }
+
+ public void releaseLock() {
+ dataAllotTableLock.unlock();
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
deleted file mode 100644
index aacf165a3ea..00000000000
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.partition.DataPartitionTable;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.commons.partition.SeriesPartitionTable;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
-import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/** Allocating new Partitions by greedy algorithm. */
-public class GreedyPartitionAllocator implements IPartitionAllocator {
-
- private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
- private static final boolean ENABLE_DATA_PARTITION_INHERIT_POLICY =
- CONF.isEnableDataPartitionInheritPolicy();
- private static final long TIME_PARTITION_INTERVAL =
- CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
-
- private final IManager configManager;
-
- public GreedyPartitionAllocator(IManager configManager) {
- this.configManager = configManager;
- }
-
- @Override
- public Map<String, SchemaPartitionTable> allocateSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>>
unassignedSchemaPartitionSlotsMap)
- throws NoAvailableRegionGroupException {
- Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>();
-
- for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
- unassignedSchemaPartitionSlotsMap.entrySet()) {
- final String storageGroup = slotsMapEntry.getKey();
- final List<TSeriesPartitionSlot> unassignedPartitionSlots =
slotsMapEntry.getValue();
-
- // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
- List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
- getPartitionManager()
- .getSortedRegionGroupSlotsCounter(storageGroup,
TConsensusGroupType.SchemaRegion);
-
- // Enumerate SeriesPartitionSlot
- Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new
ConcurrentHashMap<>();
- for (TSeriesPartitionSlot seriesPartitionSlot :
unassignedPartitionSlots) {
- // Greedy allocation
- schemaPartitionMap.put(seriesPartitionSlot,
regionSlotsCounter.get(0).getRight());
- // Bubble sort
- bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
- }
- result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
- }
-
- return result;
- }
-
- @Override
- public Map<String, DataPartitionTable> allocateDataPartition(
- Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
unassignedDataPartitionSlotsMap)
- throws NoAvailableRegionGroupException {
- Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
-
- for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
slotsMapEntry :
- unassignedDataPartitionSlotsMap.entrySet()) {
- final String database = slotsMapEntry.getKey();
- final Map<TSeriesPartitionSlot, TTimeSlotList>
unassignedPartitionSlotsMap =
- slotsMapEntry.getValue();
-
- // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
- List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
- getPartitionManager()
- .getSortedRegionGroupSlotsCounter(database,
TConsensusGroupType.DataRegion);
-
- DataPartitionTable dataPartitionTable = new DataPartitionTable();
-
- // Enumerate SeriesPartitionSlot
- for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry
:
- unassignedPartitionSlotsMap.entrySet()) {
- SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
-
- // Enumerate TimePartitionSlot in ascending order
- List<TTimePartitionSlot> timePartitionSlots =
- seriesPartitionEntry.getValue().getTimePartitionSlots();
-
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
- for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
-
- /* 1. Inherit policy */
- if (ENABLE_DATA_PARTITION_INHERIT_POLICY) {
- // Check if the current Partition's neighbor(predecessor or
successor)
- // is allocated in the same batch of Partition creation
- TConsensusGroupId neighbor =
- seriesPartitionTable.getAdjacentDataPartition(
- timePartitionSlot, TIME_PARTITION_INTERVAL);
- if (neighbor != null) {
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(neighbor));
- bubbleSort(neighbor, regionSlotsCounter);
- continue;
- }
-
- // Check if the current Partition's neighbor(predecessor or
successor)
- // was allocated in the former Partition creation
- neighbor =
- getPartitionManager()
- .getAdjacentDataPartition(
- database,
- seriesPartitionEntry.getKey(),
- timePartitionSlot,
- TIME_PARTITION_INTERVAL);
- if (neighbor != null) {
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(neighbor));
- bubbleSort(neighbor, regionSlotsCounter);
- continue;
- }
- }
-
- /* 2. Greedy policy */
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(
- timePartitionSlot,
-
Collections.singletonList(regionSlotsCounter.get(0).getRight()));
- bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
- }
- dataPartitionTable
- .getDataPartitionMap()
- .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
- }
- result.put(database, dataPartitionTable);
- }
-
- return result;
- }
-
- /**
- * Bubble sort the regionSlotsCounter from the specified consensus group
- *
- * <p>Notice: Here we use bubble sort instead of other sorting algorithm is
because that, there is
- * only one Partition allocated in each loop. Therefore, only consider one
consensus group weight
- * change is enough
- *
- * @param consensusGroupId The consensus group where the new Partition is
allocated
- * @param regionSlotsCounter List<Pair<Allocated Partition num,
TConsensusGroupId>>
- */
- private void bubbleSort(
- TConsensusGroupId consensusGroupId, List<Pair<Long, TConsensusGroupId>>
regionSlotsCounter) {
- // Find the corresponding consensus group
- int index = 0;
- for (int i = 0; i < regionSlotsCounter.size(); i++) {
- if (regionSlotsCounter.get(i).getRight().equals(consensusGroupId)) {
- index = i;
- break;
- }
- }
-
- // Do bubble sort
-
regionSlotsCounter.get(index).setLeft(regionSlotsCounter.get(index).getLeft() +
1);
- while (index < regionSlotsCounter.size() - 1
- && regionSlotsCounter.get(index).getLeft() >
regionSlotsCounter.get(index + 1).getLeft()) {
- Collections.swap(regionSlotsCounter, index, index + 1);
- index += 1;
- }
- }
-
- private PartitionManager getPartitionManager() {
- return configManager.getPartitionManager();
- }
-}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
deleted file mode 100644
index 62ef53d9655..00000000000
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartitionTable;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The IPartitionAllocator is a functional interface, which means a new
functional class who
- * implements the IPartitionAllocator must be created for each Partition
allocation.
- */
-public interface IPartitionAllocator {
-
- /**
- * Allocate SchemaPartitions
- *
- * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should
be assigned
- * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
- */
- Map<String, SchemaPartitionTable> allocateSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>>
unassignedSchemaPartitionSlotsMap)
- throws NoAvailableRegionGroupException;
-
- /**
- * Allocate DataPartitions
- *
- * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be
assigned
- * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
- */
- Map<String, DataPartitionTable> allocateDataPartition(
- Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
unassignedDataPartitionSlotsMap)
- throws NoAvailableRegionGroupException;
-}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index fae18091d36..ac0df2916a9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
@@ -85,6 +86,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.rpc.RpcUtils;
@@ -265,14 +267,13 @@ public class PartitionManager {
// Cache allocating result only if the current ConfigNode still holds
its leadership
CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
- status = getConsensusManager().confirmLeader();
+
+ status = consensusWritePartitionResult(createPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // Here we check the leadership second time
- // since the RegionGroup creating process might take some time
+ // The allocation might fail due to consensus error
resp.setStatus(status);
return resp;
}
- getConsensusManager().write(createPlan);
}
resp = (SchemaPartitionResp) getSchemaPartition(req);
@@ -389,14 +390,13 @@ public class PartitionManager {
// Cache allocating result only if the current ConfigNode still holds
its leadership
CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
createPlan.setAssignedDataPartition(assignedDataPartition);
- status = getConsensusManager().confirmLeader();
+
+ status = consensusWritePartitionResult(createPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // Here we check the leadership second time
- // since the RegionGroup creating process might take some time
+ // The allocation might fail due to consensus error
resp.setStatus(status);
return resp;
}
- getConsensusManager().write(createPlan);
}
resp = (DataPartitionResp) getDataPartition(req);
@@ -431,6 +431,24 @@ public class PartitionManager {
return resp;
}
+ private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
+ TSStatus status = getConsensusManager().confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Here we check the leadership second time
+ // since the RegionGroup creating process might take some time
+ return status;
+ }
+
+ ConsensusWriteResponse writeResp = getConsensusManager().write(plan);
+ if (!writeResp.isSuccessful()) {
+ // The allocation might fail due to consensus error
+ status = writeResp.getStatus();
+ status.setMessage(writeResp.getErrorMessage());
+ LOGGER.error("Write DataPartition allocation result failed because: {}",
status);
+ }
+ return status;
+ }
+
// ======================================================
// Leader scheduling interfaces
// ======================================================
@@ -590,22 +608,37 @@ public class PartitionManager {
}
/**
- * Only leader use this interface. Checks whether the specified
DataPartition has a predecessor or
- * successor and returns if it does
+ * Only leader use this interface. Checks whether the specified
DataPartition has a successor and
+ * returns if it does.
+ *
+ * @param database DatabaseName
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @return The specific DataPartition's successor if exists, null otherwise
+ */
+ public TConsensusGroupId getSuccessorDataPartition(
+ String database,
+ TSeriesPartitionSlot seriesPartitionSlot,
+ TTimePartitionSlot timePartitionSlot) {
+ return partitionInfo.getSuccessorDataPartition(
+ database, seriesPartitionSlot, timePartitionSlot);
+ }
+
+ /**
+ * Only leader use this interface. Checks whether the specified
DataPartition has a predecessor
+ * and returns if it does.
*
* @param database DatabaseName
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
- * @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getAdjacentDataPartition(
+ public TConsensusGroupId getPredecessorDataPartition(
String database,
TSeriesPartitionSlot seriesPartitionSlot,
- TTimePartitionSlot timePartitionSlot,
- long timePartitionInterval) {
- return partitionInfo.getAdjacentDataPartition(
- database, seriesPartitionSlot, timePartitionSlot,
timePartitionInterval);
+ TTimePartitionSlot timePartitionSlot) {
+ return partitionInfo.getPredecessorDataPartition(
+ database, seriesPartitionSlot, timePartitionSlot);
}
/**
@@ -711,6 +744,21 @@ public class PartitionManager {
return partitionInfo.getRegionGroupCount(database, type);
}
+ /**
+ * Only leader use this interface.
+ *
+ * <p>Get all the RegionGroups currently owned by the specified Database
+ *
+ * @param database DatabaseName
+ * @param type SchemaRegion or DataRegion
+ * @return List of TConsensusGroupId
+ * @throws DatabaseNotExistsException When the specified Database doesn't
exist
+ */
+ public List<TConsensusGroupId> getAllRegionGroupIds(String database,
TConsensusGroupType type)
+ throws DatabaseNotExistsException {
+ return partitionInfo.getAllRegionGroupIds(database, type);
+ }
+
/**
* Check if the specified Database exists.
*
@@ -1249,6 +1297,16 @@ public class PartitionManager {
partitionInfo.getDataRegionIds(databases, dataRegionIds);
}
+ /**
+ * Get the last DataAllotTable of the specified Database.
+ *
+ * @param database The specified Database
+ * @return The last DataAllotTable
+ */
+ public Map<TSeriesPartitionSlot, TConsensusGroupId>
getLastDataAllotTable(String database) {
+ return partitionInfo.getLastDataAllotTable(database);
+ }
+
public ScheduledExecutorService getRegionMaintainer() {
return regionMaintainer;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index b2ca2faf6b7..a629569470a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -98,19 +98,6 @@ public class DatabasePartitionTable {
replicaSet.getRegionId(), new
RegionGroup(System.currentTimeMillis(), replicaSet)));
}
- /**
- * Delete RegionGroups' cache.
- *
- * @param replicaSets List<TRegionReplicaSet>
- */
- public void deleteRegionGroups(List<TRegionReplicaSet> replicaSets) {
- replicaSets.forEach(replicaSet ->
regionGroupMap.remove(replicaSet.getRegionId()));
- }
-
- public Set<TConsensusGroupId> getAllConsensusGroupId() {
- return regionGroupMap.keySet();
- }
-
/** @return Deep copy of all Regions' RegionReplicaSet within one
StorageGroup */
public List<TRegionReplicaSet> getAllReplicaSets() {
List<TRegionReplicaSet> result = new ArrayList<>();
@@ -220,6 +207,20 @@ public class DatabasePartitionTable {
return result.getAndIncrement();
}
+ /**
+ * Only leader use this interface.
+ *
+ * <p>Get all the RegionGroups currently owned by the specified Database
+ *
+ * @param type SchemaRegion or DataRegion
+ * @return List of TConsensusGroupId
+ */
+ public List<TConsensusGroupId> getAllRegionGroupIds(TConsensusGroupType
type) {
+ return regionGroupMap.keySet().stream()
+ .filter(regionGroupId -> regionGroupId.getType().equals(type))
+ .collect(Collectors.toList());
+ }
+
public int getAssignedSeriesPartitionSlotsCount() {
return Math.max(
schemaPartitionTable.getSchemaPartitionMap().size(),
@@ -250,20 +251,28 @@ public class DatabasePartitionTable {
return dataPartitionTable.getDataPartition(partitionSlots, dataPartition);
}
+ /**
+ * Checks whether the specified DataPartition has a successor and returns if
it does.
+ *
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @return The specific DataPartition's successor if exists, null otherwise
+ */
+ public TConsensusGroupId getSuccessorDataPartition(
+ TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot
timePartitionSlot) {
+ return dataPartitionTable.getSuccessorDataPartition(seriesPartitionSlot,
timePartitionSlot);
+ }
+
/**
* Checks whether the specified DataPartition has a predecessor and returns
if it does.
*
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
- * @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getAdjacentDataPartition(
- TSeriesPartitionSlot seriesPartitionSlot,
- TTimePartitionSlot timePartitionSlot,
- long timePartitionInterval) {
- return dataPartitionTable.getAdjacentDataPartition(
- seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+ public TConsensusGroupId getPredecessorDataPartition(
+ TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot
timePartitionSlot) {
+ return dataPartitionTable.getPredecessorDataPartition(seriesPartitionSlot,
timePartitionSlot);
}
/**
@@ -553,6 +562,15 @@ public class DatabasePartitionTable {
return dataRegionIds;
}
+ /**
+ * Get the last DataAllotTable.
+ *
+ * @return The last DataAllotTable
+ */
+ public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
+ return dataPartitionTable.getLastDataAllotTable();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index d85c63e4e10..d61396a4c7a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -385,24 +385,42 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * Checks whether the specified DataPartition has a predecessor or successor
and returns if it
- * does.
+ * Checks whether the specified DataPartition has a successor and returns if
it does.
+ *
+ * @param database DatabaseName
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @return The specific DataPartition's successor if exists, null otherwise
+ */
+ public TConsensusGroupId getSuccessorDataPartition(
+ String database,
+ TSeriesPartitionSlot seriesPartitionSlot,
+ TTimePartitionSlot timePartitionSlot) {
+ if (isDatabaseExisted(database)) {
+ return databasePartitionTables
+ .get(database)
+ .getSuccessorDataPartition(seriesPartitionSlot, timePartitionSlot);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Checks whether the specified DataPartition has a predecessor and returns
if it does.
*
* @param database DatabaseName
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
- * @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getAdjacentDataPartition(
+ public TConsensusGroupId getPredecessorDataPartition(
String database,
TSeriesPartitionSlot seriesPartitionSlot,
- TTimePartitionSlot timePartitionSlot,
- long timePartitionInterval) {
- if (databasePartitionTables.containsKey(database)) {
+ TTimePartitionSlot timePartitionSlot) {
+ if (isDatabaseExisted(database)) {
return databasePartitionTables
.get(database)
- .getAdjacentDataPartition(seriesPartitionSlot, timePartitionSlot,
timePartitionInterval);
+ .getPredecessorDataPartition(seriesPartitionSlot, timePartitionSlot);
} else {
return null;
}
@@ -722,6 +740,25 @@ public class PartitionInfo implements SnapshotProcessor {
return databasePartitionTables.get(database).getRegionGroupCount(type);
}
+ /**
+ * Only leader use this interface.
+ *
+ * <p>Get all the RegionGroups currently owned by the specified Database
+ *
+ * @param database DatabaseName
+ * @param type SchemaRegion or DataRegion
+ * @return List of TConsensusGroupId
+ * @throws DatabaseNotExistsException When the specified Database doesn't
exist
+ */
+ public List<TConsensusGroupId> getAllRegionGroupIds(String database,
TConsensusGroupType type)
+ throws DatabaseNotExistsException {
+ if (!isDatabaseExisted(database)) {
+ throw new DatabaseNotExistsException(database);
+ }
+
+ return databasePartitionTables.get(database).getAllRegionGroupIds(type);
+ }
+
/**
* Only leader use this interface.
*
@@ -761,14 +798,14 @@ public class PartitionInfo implements SnapshotProcessor {
/**
* Only leader use this interface.
*
- * @param storageGroup StorageGroupName
+ * @param database DatabaseName
* @param type SchemaRegion or DataRegion
* @return The StorageGroup's Running or Available Regions that sorted by
the number of allocated
* slots
*/
public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(
- String storageGroup, TConsensusGroupType type) {
- return
databasePartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type);
+ String database, TConsensusGroupType type) {
+ return
databasePartitionTables.get(database).getRegionGroupSlotsCounter(type);
}
/**
@@ -784,6 +821,19 @@ public class PartitionInfo implements SnapshotProcessor {
return schemaPartitionSet;
}
+ /**
+ * Get the last DataAllotTable of the specified Database.
+ *
+ * @param database The specified Database
+ * @return The last DataAllotTable
+ */
+ public Map<TSeriesPartitionSlot, TConsensusGroupId>
getLastDataAllotTable(String database) {
+ if (isDatabaseExisted(database)) {
+ return databasePartitionTables.get(database).getLastDataAllotTable();
+ }
+ return Collections.emptyMap();
+ }
+
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 34211efdeb6..7ea774cc33c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -206,6 +206,9 @@ public class DeleteDatabaseProcedure
LOG.info(
"[DeleteDatabaseProcedure] Database: {} is deleted
successfully",
deleteDatabaseSchema.getName());
+ env.getConfigManager()
+ .getLoadManager()
+ .clearDataPartitionPolicyTable(deleteDatabaseSchema.getName());
return Flow.NO_MORE_STATE;
} else if (getCycles() > RETRY_THRESHOLD) {
setFailure(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 78abfbca2dc..2dab4c451aa 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -52,6 +53,7 @@ public class CreateRegionGroupsProcedure
private TConsensusGroupType consensusGroupType;
private CreateRegionGroupsPlan createRegionGroupsPlan = new
CreateRegionGroupsPlan();
+ private CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
/** key: TConsensusGroupId value: Failed RegionReplicas */
private Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets =
new HashMap<>();
@@ -70,9 +72,11 @@ public class CreateRegionGroupsProcedure
public CreateRegionGroupsProcedure(
TConsensusGroupType consensusGroupType,
CreateRegionGroupsPlan createRegionGroupsPlan,
+ CreateRegionGroupsPlan persistPlan,
Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets) {
this.consensusGroupType = consensusGroupType;
this.createRegionGroupsPlan = createRegionGroupsPlan;
+ this.persistPlan = persistPlan;
this.failedRegionReplicaSets = failedRegionReplicaSets;
}
@@ -84,7 +88,7 @@ public class CreateRegionGroupsProcedure
setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS);
break;
case SHUNT_REGION_REPLICAS:
- CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+ persistPlan = new CreateRegionGroupsPlan();
OfferRegionMaintainTasksPlan offerPlan = new
OfferRegionMaintainTasksPlan();
// Filter those RegionGroups that created successfully
createRegionGroupsPlan
@@ -201,6 +205,17 @@ public class CreateRegionGroupsProcedure
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
+ if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+ // Re-balance all corresponding DataPartitionPolicyTable
+ persistPlan
+ .getRegionGroupMap()
+ .keySet()
+ .forEach(
+ database ->
+ env.getConfigManager()
+ .getLoadManager()
+ .reBalanceDataPartitionPolicy(database));
+ }
return Flow.NO_MORE_STATE;
}
@@ -242,6 +257,7 @@ public class CreateRegionGroupsProcedure
ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
});
+ persistPlan.serializeForProcedure(stream);
}
@Override
@@ -259,6 +275,9 @@ public class CreateRegionGroupsProcedure
ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
failedRegionReplicaSets.put(groupId, replica);
}
+ if (byteBuffer.hasRemaining()) {
+ persistPlan.deserializeForProcedure(byteBuffer);
+ }
} catch (Exception e) {
LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure",
e);
throw new RuntimeException(e);
@@ -267,16 +286,22 @@ public class CreateRegionGroupsProcedure
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
CreateRegionGroupsProcedure that = (CreateRegionGroupsProcedure) o;
return consensusGroupType == that.consensusGroupType
&& createRegionGroupsPlan.equals(that.createRegionGroupsPlan)
+ && persistPlan.equals(that.persistPlan)
&& failedRegionReplicaSets.equals(that.failedRegionReplicaSets);
}
@Override
public int hashCode() {
- return Objects.hash(consensusGroupType, createRegionGroupsPlan,
failedRegionReplicaSets);
+ return Objects.hash(
+ consensusGroupType, createRegionGroupsPlan, persistPlan,
failedRegionReplicaSets);
}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index e1a212c7f50..f30d46dd117 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.consensus.request;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -818,9 +819,12 @@ public class ConfigPhysicalPlanSerDeTest {
CreateRegionGroupsPlan createRegionGroupsPlan = new
CreateRegionGroupsPlan();
createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
+ CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+ persistPlan.addRegionGroup("root.sg0", dataRegionSet);
+ persistPlan.addRegionGroup("root.sg1", schemaRegionSet);
CreateRegionGroupsProcedure procedure0 =
new CreateRegionGroupsProcedure(
- TConsensusGroupType.DataRegion, createRegionGroupsPlan,
failedRegions);
+ TConsensusGroupType.DataRegion, createRegionGroupsPlan,
persistPlan, failedRegions);
updateProcedurePlan0.setProcedure(procedure0);
updateProcedurePlan1 =
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
new file mode 100644
index 00000000000..70ce9231c17
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataPartitionPolicyTableTest {
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+ private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+ @Test
+ public void testUpdateDataAllotTable() {
+ DataPartitionPolicyTable dataPartitionPolicyTable = new
DataPartitionPolicyTable();
+ List<TConsensusGroupId> dataRegionGroups = new ArrayList<>();
+
+ // Test 1: construct DataAllotTable from scratch
+ TConsensusGroupId group1 = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
+ dataRegionGroups.add(group1);
+ dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+ for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ // All SeriesPartitionSlots belong to group1
+ Assert.assertEquals(
+ group1,
+
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot));
+ }
+
+ // Test2: extend DataRegionGroups
+ Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new
HashMap<>();
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion,
2));
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion,
3));
+ dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+ int mu = SERIES_SLOT_NUM / 3;
+ Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>();
+ for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ TConsensusGroupId groupId =
+
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+ lastDataAllotTable.put(seriesPartitionSlot, groupId);
+ counter.computeIfAbsent(groupId, empty -> new
AtomicInteger(0)).incrementAndGet();
+ }
+ // All DataRegionGroups divide SeriesPartitionSlots evenly
+ for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry :
counter.entrySet()) {
+ Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+ }
+
+ // Test 3: extend DataRegionGroups while inherit as much
SeriesPartitionSlots as possible
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion,
4));
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion,
5));
+ dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+ Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>();
+ mu = SERIES_SLOT_NUM / 5;
+ counter.clear();
+ for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ TConsensusGroupId groupId =
+
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+ counter.computeIfAbsent(groupId, empty -> new
AtomicInteger(0)).incrementAndGet();
+ if (groupId.getId() < 4) {
+ // Most of SeriesPartitionSlots in the first three DataRegionGroups
should remain unchanged
+ Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot),
groupId);
+ unchangedSlots.computeIfAbsent(groupId, empty -> new
AtomicInteger(0)).incrementAndGet();
+ }
+ }
+ // All DataRegionGroups divide SeriesPartitionSlots evenly
+ for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry :
counter.entrySet()) {
+ Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+ }
+ // Most of SeriesPartitionSlots in the first three DataRegionGroups should
remain unchanged
+ for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry :
unchangedSlots.entrySet()) {
+ Assert.assertEquals(mu, counterEntry.getValue().get());
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
index 0642bf6d6ab..9cd9d41301c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
@@ -93,10 +93,13 @@ public class CreateRegionGroupsProcedureTest {
CreateRegionGroupsPlan createRegionGroupsPlan = new
CreateRegionGroupsPlan();
createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
+ CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+ persistPlan.addRegionGroup("root.sg0", dataRegionSet);
+ persistPlan.addRegionGroup("root.sg1", schemaRegionSet);
CreateRegionGroupsProcedure procedure0 =
new CreateRegionGroupsProcedure(
- TConsensusGroupType.DataRegion, createRegionGroupsPlan,
failedRegions0);
+ TConsensusGroupType.DataRegion, createRegionGroupsPlan,
persistPlan, failedRegions0);
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 2a6ead060ec..08cc77c3c35 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -128,13 +128,6 @@ cluster_name=defaultCluster
# data_region_per_data_node=5.0
-# Whether to enable the DataPartition inherit policy.
-# DataPartition within the same SeriesPartitionSlot will inherit the
allocation result of
-# the predecessor or successor TimePartitionSlot if set true
-# Datatype: Boolean
-# enable_data_partition_inherit_policy=true
-
-
# The policy of cluster RegionGroups' leader distribution.
# E.g. we should balance cluster RegionGroups' leader distribution when some
DataNodes are shutdown or re-connected.
# These policies are currently supported:
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 1a5611536ae..c0eac239d88 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -34,6 +34,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -99,22 +100,34 @@ public class DataPartitionTable {
}
/**
- * Checks whether the specified DataPartition has a predecessor or successor
and returns if it
- * does
+ * Checks whether the specified DataPartition has a successor and returns if
it does.
+ *
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @return The specific DataPartition's successor if exists, null otherwise
+ */
+ public TConsensusGroupId getSuccessorDataPartition(
+ TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot
timePartitionSlot) {
+ if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
+ return
dataPartitionMap.get(seriesPartitionSlot).getSuccessorDataPartition(timePartitionSlot);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Checks whether the specified DataPartition has a predecessor and returns
if it does.
*
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
- * @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getAdjacentDataPartition(
- TSeriesPartitionSlot seriesPartitionSlot,
- TTimePartitionSlot timePartitionSlot,
- long timePartitionInterval) {
+ public TConsensusGroupId getPredecessorDataPartition(
+ TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot
timePartitionSlot) {
if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
return dataPartitionMap
.get(seriesPartitionSlot)
- .getAdjacentDataPartition(timePartitionSlot, timePartitionInterval);
+ .getPredecessorDataPartition(timePartitionSlot);
} else {
return null;
}
@@ -236,6 +249,19 @@ public class DataPartitionTable {
.collect(Collectors.toList());
}
+ /**
+ * Get the last DataAllotTable.
+ *
+ * @return The last DataAllotTable
+ */
+ public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
+ Map<TSeriesPartitionSlot, TConsensusGroupId> result = new HashMap<>();
+ dataPartitionMap.forEach(
+ (seriesPartitionSlot, seriesPartitionTable) ->
+ result.put(seriesPartitionSlot,
seriesPartitionTable.getLastConsensusGroupId()));
+ return result;
+ }
+
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 47216cc41ce..1b87e42ad11 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -33,10 +34,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,22 +46,26 @@ import java.util.stream.Collectors;
public class SeriesPartitionTable {
- private final Map<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap;
+ private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap;
public SeriesPartitionTable() {
- this.seriesPartitionMap = new ConcurrentHashMap<>();
+ this.seriesPartitionMap = new TreeMap<>();
}
public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap) {
- this.seriesPartitionMap = seriesPartitionMap;
+ this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
}
public Map<TTimePartitionSlot, List<TConsensusGroupId>>
getSeriesPartitionMap() {
return seriesPartitionMap;
}
+ public void putDataPartition(TTimePartitionSlot timePartitionSlot,
TConsensusGroupId groupId) {
+ seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new
ArrayList<>()).add(groupId);
+ }
+
/**
- * Thread-safely get DataPartition within the specific StorageGroup
+ * Thread-safely get DataPartition within the specific Database.
*
* @param partitionSlotList TimePartitionSlotList
* @param seriesPartitionTable Store the matched SeriesPartitions
@@ -119,34 +124,29 @@ public class SeriesPartitionTable {
}
/**
- * Checks whether the specified DataPartition has a predecessor or successor
and returns if it
- * does
+ * Check and return the specified DataPartition's successor.
*
* @param timePartitionSlot Corresponding TimePartitionSlot
- * @param timePartitionInterval Time partition interval
- * @return The specific DataPartition's predecessor if exists, null otherwise
+ * @return The specified DataPartition's successor if exists, null otherwise
*/
- public TConsensusGroupId getAdjacentDataPartition(
- TTimePartitionSlot timePartitionSlot, long timePartitionInterval) {
- if (timePartitionSlot.getStartTime() >= timePartitionInterval) {
- // Check predecessor first
- TTimePartitionSlot predecessorSlot =
- new TTimePartitionSlot(timePartitionSlot.getStartTime() -
timePartitionInterval);
- TConsensusGroupId predecessor =
- seriesPartitionMap.getOrDefault(predecessorSlot,
Collections.singletonList(null)).get(0);
- if (predecessor != null) {
- return predecessor;
- }
- }
+ public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot
timePartitionSlot) {
+ TTimePartitionSlot successorSlot =
seriesPartitionMap.higherKey(timePartitionSlot);
+ return successorSlot == null ? null :
seriesPartitionMap.get(successorSlot).get(0);
+ }
- // Check successor
- TTimePartitionSlot successorSlot =
- new TTimePartitionSlot(timePartitionSlot.getStartTime() +
timePartitionInterval);
- return seriesPartitionMap.getOrDefault(successorSlot,
Collections.singletonList(null)).get(0);
+ /**
+ * Check and return the specified DataPartition's predecessor.
+ *
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @return The specified DataPartition's predecessor if exists, null
otherwise
+ */
+ public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot
timePartitionSlot) {
+ TTimePartitionSlot predecessorSlot =
seriesPartitionMap.lowerKey(timePartitionSlot);
+ return predecessorSlot == null ? null :
seriesPartitionMap.get(predecessorSlot).get(0);
}
/**
- * Query a timePartition's corresponding dataRegionIds
+ * Query a timePartition's corresponding dataRegionIds.
*
* @param timeSlotId Time partition's timeSlotId
* @return the timePartition's corresponding dataRegionIds
@@ -179,7 +179,7 @@ public class SeriesPartitionTable {
}
/**
- * Create DataPartition within the specific SeriesPartitionSlot
+ * Create DataPartition within the specific SeriesPartitionSlot.
*
* @param assignedSeriesPartitionTable Assigned result
* @param seriesPartitionSlot Corresponding TSeriesPartitionSlot
@@ -206,7 +206,7 @@ public class SeriesPartitionTable {
/**
* Only Leader use this interface. And this interface is synchronized.
Thread-safely filter no
- * assigned DataPartitionSlots within the specific SeriesPartitionSlot
+ * assigned DataPartitionSlots within the specific SeriesPartitionSlot.
*
* @param partitionSlots TimePartitionSlots
* @return Unassigned PartitionSlots
@@ -225,6 +225,20 @@ public class SeriesPartitionTable {
return result;
}
+ /**
+ * Get the last DataPartition's ConsensusGroupId.
+ *
+ * @return The last DataPartition's ConsensusGroupId, null if there are no
DataPartitions yet
+ */
+ public TConsensusGroupId getLastConsensusGroupId() {
+ Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+ seriesPartitionMap.lastEntry();
+ if (lastEntry == null) {
+ return null;
+ }
+ return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
+ }
+
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
@@ -238,7 +252,7 @@ public class SeriesPartitionTable {
}
}
- /** Only for ConsensusRequest */
+ /** Only for ConsensusRequest. */
public void deserialize(ByteBuffer buffer) {
int timePartitionSlotNum = buffer.getInt();
for (int i = 0; i < timePartitionSlotNum; i++) {
@@ -255,7 +269,7 @@ public class SeriesPartitionTable {
}
}
- /** Only for Snapshot */
+ /** Only for Snapshot. */
public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
new file mode 100644
index 00000000000..1fe5ce1bcdf
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.structure;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * This class is used to store key-value pairs. It supports the following
operations: 1. Put a
+ * key-value pair. 2. Get key with minimum value.
+ *
+ * @param <K> The type of Key
+ * @param <V> The type of Value, should be Comparable
+ */
+public class BalanceTreeMap<K, V extends Comparable<V>> {
+
+ private final HashMap<K, V> keyValueMap;
+ private final TreeMap<V, Set<K>> valueKeysMap;
+
+ public BalanceTreeMap() {
+ this.keyValueMap = new HashMap<>();
+ this.valueKeysMap = new TreeMap<>();
+ }
+
+ /**
+ * Put or modify a key-value pair.
+ *
+ * @param key Key
+ * @param value Value
+ */
+ public void put(K key, V value) {
+ // Update keyValueMap
+ V oldValue = keyValueMap.put(key, value);
+
+ // Update valueKeyMap
+ if (oldValue != null) {
+ Set<K> keysSet = valueKeysMap.get(oldValue);
+ keysSet.remove(key);
+ if (keysSet.isEmpty()) {
+ valueKeysMap.remove(oldValue);
+ }
+ }
+ valueKeysMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key);
+ }
+
+ /**
+ * Get key with minimum value.
+ *
+ * @return Key with minimum value
+ */
+ public K getKeyWithMinValue() {
+ return valueKeysMap.firstEntry().getValue().iterator().next();
+ }
+
+ public V get(K key) {
+ return keyValueMap.getOrDefault(key, null);
+ }
+
+ public boolean containsKey(K key) {
+ return keyValueMap.containsKey(key);
+ }
+
+ public int size() {
+ return keyValueMap.size();
+ }
+
+ @TestOnly
+ public void remove(K key) {
+ V value = keyValueMap.remove(key);
+ if (value != null) {
+ Set<K> keysSet = valueKeysMap.get(value);
+ keysSet.remove(key);
+ if (keysSet.isEmpty()) {
+ valueKeysMap.remove(value);
+ }
+ }
+ }
+
+ @TestOnly
+ public boolean isEmpty() {
+ return keyValueMap.isEmpty() && valueKeysMap.isEmpty();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
new file mode 100644
index 00000000000..d38b8176f26
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.structure;
+
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class BalanceTreeMapTest {
+
+ @Test
+ public void testGetKeyWithMinValue() {
+ Random random = new Random();
+ BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new
BalanceTreeMap<>();
+ for (int i = 0; i < 100; i++) {
+ balanceTreeMap.put(new TSeriesPartitionSlot(i),
random.nextInt(Integer.MAX_VALUE));
+ }
+ TSeriesPartitionSlot minSlot = new TSeriesPartitionSlot(100);
+ balanceTreeMap.put(minSlot, Integer.MIN_VALUE);
+ for (int i = 101; i < 200; i++) {
+ balanceTreeMap.put(new TSeriesPartitionSlot(i),
random.nextInt(Integer.MAX_VALUE));
+ }
+ Assert.assertEquals(minSlot, balanceTreeMap.getKeyWithMinValue());
+
+ int currentValue = Integer.MIN_VALUE;
+ for (int i = 0; i < 200; i++) {
+ TSeriesPartitionSlot slot = balanceTreeMap.getKeyWithMinValue();
+ Assert.assertTrue(balanceTreeMap.get(slot) >= currentValue);
+ currentValue = balanceTreeMap.get(slot);
+ balanceTreeMap.remove(slot);
+ }
+ }
+
+ @Test
+ public void testKeysDuplicate() {
+ BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new
BalanceTreeMap<>();
+ Set<TSeriesPartitionSlot> duplicateSet0 = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+ balanceTreeMap.put(slot, 0);
+ duplicateSet0.add(slot);
+ }
+ Set<TSeriesPartitionSlot> duplicateSet1 = new HashSet<>();
+ for (int i = 10; i < 20; i++) {
+ TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+ balanceTreeMap.put(slot, 1);
+ duplicateSet1.add(slot);
+ }
+
+ for (int i = 0; i < 10; i++) {
+
Assert.assertTrue(duplicateSet0.contains(balanceTreeMap.getKeyWithMinValue()));
+ balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+ }
+ for (int i = 0; i < 10; i++) {
+
Assert.assertTrue(duplicateSet1.contains(balanceTreeMap.getKeyWithMinValue()));
+ balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+ }
+ Assert.assertTrue(balanceTreeMap.isEmpty());
+ }
+}