This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 81d0aa3c3f4 [To rel/1.2][IOTDB-6079] Cluster computing resource 
balance (#10746)
81d0aa3c3f4 is described below

commit 81d0aa3c3f4bf9304944c8e1a72b803f78f7935b
Author: Potato <[email protected]>
AuthorDate: Mon Jul 31 23:01:33 2023 +0800

    [To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)
---
 .../iotdb/it/env/cluster/MppCommonConfig.java      |  14 +-
 .../it/env/cluster/MppSharedCommonConfig.java      |  15 +-
 .../iotdb/it/env/remote/RemoteCommonConfig.java    |  12 +-
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   4 +-
 .../it/partition/IoTDBPartitionCreationIT.java     | 176 -------------
 .../partition/IoTDBPartitionInheritPolicyIT.java   | 276 ++++++++++++++-------
 .../confignode/it/utils/ConfigNodeTestUtils.java   | 117 +++++++++
 .../db/it/last/IoTDBLastQueryLastCacheIT.java      |   3 +-
 .../apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java |   2 +-
 .../session/it/IoTDBSessionSchemaTemplateIT.java   |  22 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  14 --
 .../confignode/conf/ConfigNodeDescriptor.java      |   6 -
 .../iotdb/confignode/manager/load/LoadManager.java |  16 +-
 .../manager/load/balancer/PartitionBalancer.java   | 201 ++++++++++++++-
 .../partition/DataPartitionPolicyTable.java        | 144 +++++++++++
 .../partition/GreedyPartitionAllocator.java        | 203 ---------------
 .../balancer/partition/IPartitionAllocator.java    |  55 ----
 .../manager/partition/PartitionManager.java        |  90 +++++--
 .../partition/DatabasePartitionTable.java          |  58 +++--
 .../persistence/partition/PartitionInfo.java       |  72 +++++-
 .../impl/schema/DeleteDatabaseProcedure.java       |   3 +
 .../statemachine/CreateRegionGroupsProcedure.java  |  33 ++-
 .../request/ConfigPhysicalPlanSerDeTest.java       |   6 +-
 .../partition/DataPartitionPolicyTableTest.java    | 105 ++++++++
 .../impl/CreateRegionGroupsProcedureTest.java      |   5 +-
 .../resources/conf/iotdb-common.properties         |   7 -
 .../commons/partition/DataPartitionTable.java      |  42 +++-
 .../commons/partition/SeriesPartitionTable.java    |  74 +++---
 .../iotdb/commons/structure/BalanceTreeMap.java    | 104 ++++++++
 .../commons/structure/BalanceTreeMapTest.java      |  82 ++++++
 30 files changed, 1273 insertions(+), 688 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index 6c01de2e787..4dbc3453fb8 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -199,14 +199,6 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableDataPartitionInheritPolicy(
-      boolean enableDataPartitionInheritPolicy) {
-    setProperty(
-        "enable_data_partition_inherit_policy", 
String.valueOf(enableDataPartitionInheritPolicy));
-    return this;
-  }
-
   @Override
   public CommonConfig setDataReplicationFactor(int dataReplicationFactor) {
     setProperty("data_replication_factor", 
String.valueOf(dataReplicationFactor));
@@ -359,4 +351,10 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     setProperty("database_limit_threshold", 
String.valueOf(databaseLimitThreshold));
     return this;
   }
+
+  @Override
+  public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+    setProperty("data_region_per_data_node", 
String.valueOf(dataRegionPerDataNode));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 7af4f01fa63..b362a02d97b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -172,14 +172,6 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableDataPartitionInheritPolicy(
-      boolean enableDataPartitionInheritPolicy) {
-    
cnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy);
-    
dnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy);
-    return this;
-  }
-
   @Override
   public CommonConfig setSchemaRegionGroupExtensionPolicy(String 
schemaRegionGroupExtensionPolicy) {
     
cnConfig.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy);
@@ -372,4 +364,11 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     cnConfig.setDatabaseLimitThreshold(databaseLimitThreshold);
     return this;
   }
+
+  @Override
+  public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+    dnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
+    cnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index a578a6f4cf5..e181d5e1f73 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.it.env.remote;
 
 import org.apache.iotdb.itbase.env.CommonConfig;
@@ -122,12 +123,6 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableDataPartitionInheritPolicy(
-      boolean enableDataPartitionInheritPolicy) {
-    return this;
-  }
-
   @Override
   public CommonConfig setSchemaRegionGroupExtensionPolicy(String 
schemaRegionGroupExtensionPolicy) {
     return this;
@@ -266,4 +261,9 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setDatabaseLimitThreshold(long databaseLimitThreshold) {
     return this;
   }
+
+  @Override
+  public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 2a08304221d..08161c36bc3 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -62,8 +62,6 @@ public interface CommonConfig {
 
   CommonConfig setDataRegionConsensusProtocolClass(String 
dataRegionConsensusProtocolClass);
 
-  CommonConfig setEnableDataPartitionInheritPolicy(boolean 
enableDataPartitionInheritPolicy);
-
   CommonConfig setSchemaRegionGroupExtensionPolicy(String 
schemaRegionGroupExtensionPolicy);
 
   CommonConfig setDefaultSchemaRegionGroupNumPerDatabase(int 
schemaRegionGroupPerDatabase);
@@ -119,4 +117,6 @@ public interface CommonConfig {
   CommonConfig setSortBufferSize(long sortBufferSize);
 
   CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte);
+
+  CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
index 33844dcf48c..dba7b812814 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
@@ -419,182 +419,6 @@ public class IoTDBPartitionCreationIT {
           4 + testTimePartitionBatchSize,
           testTimePartitionInterval,
           dataPartitionTableResp.getDataPartitionTable());
-
-      // RegionGroup statistics:
-      // 0: 1 Removing, 1 partition
-      // 1: 1 ReadOnly, 1 partition
-      // 2: 1 Unknown, 1 partition
-      // 3: All Running, 1 partition
-      // Least Region Group number per storageGroup = 4, match the current 
Region Group number
-      // Will allocate the new partition to Running RegionGroup 3, DataNodes: 
[1, 2, 6]
-      showRegionResp = client.showRegion(new TShowRegionReq());
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showRegionResp.getStatus().getCode());
-      for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
-        if (regionInfo.getDataNodeId() == 6) {
-          Assert.assertEquals(regionInfo.getTimeSlots(), 2);
-        }
-      }
-
-      partitionSlotsMap =
-          ConfigNodeTestUtils.constructPartitionSlotsMap(
-              sg,
-              5,
-              5 + testSeriesPartitionBatchSize,
-              5,
-              5 + testTimePartitionBatchSize,
-              testTimePartitionInterval);
-      dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
-      for (int retry = 0; retry < 5; retry++) {
-        // Build new Client since it's unstable in Win8 environment
-        try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-          dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
-          if (dataPartitionTableResp != null) {
-            break;
-          }
-        } catch (Exception e) {
-          // Retry sometimes in order to avoid request timeout
-          LOGGER.error(e.getMessage());
-          TimeUnit.SECONDS.sleep(1);
-        }
-      }
-      Assert.assertNotNull(dataPartitionTableResp);
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          dataPartitionTableResp.getStatus().getCode());
-      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
-      ConfigNodeTestUtils.checkDataPartitionTable(
-          sg,
-          5,
-          5 + testSeriesPartitionBatchSize,
-          5,
-          5 + testTimePartitionBatchSize,
-          testTimePartitionInterval,
-          dataPartitionTableResp.getDataPartitionTable());
-
-      // RegionGroup statistics:
-      // 0: 1 Removing, 1 partition
-      // 1: 1 ReadOnly, 1 partition
-      // 2: 1 Unknown, 1 partition
-      // 3: All Running, 2 partition
-      // Least Region Group number per storageGroup = 4, match the current 
Region Group number
-      // Will allocate the new partition to available RegionGroup 2, 
DataNodes: [1, 2, 5]
-      showRegionResp = client.showRegion(new TShowRegionReq());
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showRegionResp.getStatus().getCode());
-      for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
-        if (regionInfo.getDataNodeId() == 5) {
-          Assert.assertEquals(regionInfo.getTimeSlots(), 2);
-        }
-      }
-
-      partitionSlotsMap =
-          ConfigNodeTestUtils.constructPartitionSlotsMap(
-              sg,
-              6,
-              6 + testSeriesPartitionBatchSize,
-              6,
-              6 + testTimePartitionBatchSize,
-              testTimePartitionInterval);
-      dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
-      for (int retry = 0; retry < 5; retry++) {
-        // Build new Client since it's unstable in Win8 environment
-        try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-          dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
-          if (dataPartitionTableResp != null) {
-            break;
-          }
-        } catch (Exception e) {
-          // Retry sometimes in order to avoid request timeout
-          LOGGER.error(e.getMessage());
-          TimeUnit.SECONDS.sleep(1);
-        }
-      }
-      Assert.assertNotNull(dataPartitionTableResp);
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          dataPartitionTableResp.getStatus().getCode());
-      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
-      ConfigNodeTestUtils.checkDataPartitionTable(
-          sg,
-          6,
-          6 + testSeriesPartitionBatchSize,
-          6,
-          6 + testTimePartitionBatchSize,
-          testTimePartitionInterval,
-          dataPartitionTableResp.getDataPartitionTable());
-
-      // RegionGroup statistics:
-      // 0: 1 Removing, 1 partition
-      // 1: 1 ReadOnly, 1 partition
-      // 2: 1 Unknown, 2 partition
-      // 3: All Running, 2 partition
-      // Least Region Group number per storageGroup = 4, match the current 
Region Group number
-      // Will allocate the new partition to Discouraged RegionGroup 2, 
DataNodes: [1, 2, 4]
-      showRegionResp = client.showRegion(new TShowRegionReq());
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showRegionResp.getStatus().getCode());
-      for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
-        if (regionInfo.getDataNodeId() == 4) {
-          Assert.assertEquals(regionInfo.getTimeSlots(), 2);
-        }
-      }
-
-      partitionSlotsMap =
-          ConfigNodeTestUtils.constructPartitionSlotsMap(
-              sg,
-              7,
-              7 + testSeriesPartitionBatchSize,
-              7,
-              7 + testTimePartitionBatchSize,
-              testTimePartitionInterval);
-      dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
-      for (int retry = 0; retry < 5; retry++) {
-        // Build new Client since it's unstable in Win8 environment
-        try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-          dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
-          if (dataPartitionTableResp != null) {
-            break;
-          }
-        } catch (Exception e) {
-          // Retry sometimes in order to avoid request timeout
-          LOGGER.error(e.getMessage());
-          TimeUnit.SECONDS.sleep(1);
-        }
-      }
-      Assert.assertNotNull(dataPartitionTableResp);
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          dataPartitionTableResp.getStatus().getCode());
-      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
-      ConfigNodeTestUtils.checkDataPartitionTable(
-          sg,
-          7,
-          7 + testSeriesPartitionBatchSize,
-          7,
-          7 + testTimePartitionBatchSize,
-          testTimePartitionInterval,
-          dataPartitionTableResp.getDataPartitionTable());
-
-      // RegionGroup statistics:
-      // 0: 1 Removing, 1 partition
-      // 1: 1 ReadOnly, 2 partition
-      // 2: 1 Unknown, 2 partition
-      // 3: All Running, 2 partition
-      // Least Region Group number per storageGroup = 4, match the current 
Region Group number
-      // Will allocate the new partition to Running RegionGroup 3, DataNodes: 
[0, 1, 5]
-      // Because RegionGroup 1 is Disabled
-      showRegionResp = client.showRegion(new TShowRegionReq());
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showRegionResp.getStatus().getCode());
-      for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
-        if (regionInfo.getDataNodeId() == 6) {
-          Assert.assertEquals(regionInfo.getTimeSlots(), 3);
-        }
-      }
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index 8b49948f5f7..2efd5db7a0b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.it.partition;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -40,27 +39,27 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
 public class IoTDBPartitionInheritPolicyIT {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBPartitionInheritPolicyIT.class);
-  private static final boolean testEnableDataPartitionInheritPolicy = true;
   private static final String testDataRegionConsensusProtocolClass =
       ConsensusFactory.RATIS_CONSENSUS;
-  private static final int testReplicationFactor = 3;
+  private static final int testReplicationFactor = 1;
+  private static final int testSeriesSlotNum = 1000;
   private static final long testTimePartitionInterval = 604800000;
+  private static final double testDataRegionPerDataNode = 5.0;
 
-  private static final String sg = "root.sg";
-  private static final int storageGroupNum = 2;
-  private static final int testSeriesPartitionSlotNum = 100;
-  private static final int seriesPartitionBatchSize = 10;
+  private static final String database = "root.database";
+  private static final int seriesPartitionSlotBatchSize = 100;
   private static final int testTimePartitionSlotsNum = 100;
   private static final int timePartitionBatchSize = 10;
 
@@ -70,21 +69,19 @@ public class IoTDBPartitionInheritPolicyIT {
         .getConfig()
         .getCommonConfig()
         
.setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass)
-        
.setEnableDataPartitionInheritPolicy(testEnableDataPartitionInheritPolicy)
         .setDataReplicationFactor(testReplicationFactor)
         .setTimePartitionInterval(testTimePartitionInterval)
-        .setSeriesSlotNum(testSeriesPartitionSlotNum * 10);
+        .setSeriesSlotNum(testSeriesSlotNum)
+        .setDataRegionPerDataNode(testDataRegionPerDataNode);
 
-    // Init 1C3D environment
-    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+    // Init 1C1D environment
+    EnvFactory.getEnv().initClusterEnvironment(1, 1);
 
-    // Set StorageGroups
+    // Set Database
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-      for (int i = 0; i < storageGroupNum; i++) {
-        TSStatus status = client.setDatabase(new TDatabaseSchema(sg + i));
-        Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-      }
+      TSStatus status = client.setDatabase(new TDatabaseSchema(database));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
     }
   }
 
@@ -95,76 +92,177 @@ public class IoTDBPartitionInheritPolicyIT {
 
   @Test
   public void testDataPartitionInheritPolicy() throws Exception {
+    final long baseStartTime = 1000;
+    Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new 
ConcurrentHashMap<>();
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-      TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
-      TDataPartitionTableResp dataPartitionTableResp;
-      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap;
-
-      for (int i = 0; i < storageGroupNum; i++) {
-        String storageGroup = sg + i;
-        for (int j = 0; j < testSeriesPartitionSlotNum; j += 
seriesPartitionBatchSize) {
-          // Test inherit predecessor or successor
-          boolean isAscending = (j / 10) % 2 == 0;
-          int step = isAscending ? timePartitionBatchSize : 
-timePartitionBatchSize;
-          int k = isAscending ? 0 : testTimePartitionSlotsNum - 
timePartitionBatchSize;
-          while (0 <= k && k < testTimePartitionSlotsNum) {
-            partitionSlotsMap =
-                ConfigNodeTestUtils.constructPartitionSlotsMap(
-                    storageGroup,
-                    j,
-                    j + seriesPartitionBatchSize,
-                    k,
-                    k + timePartitionBatchSize,
-                    testTimePartitionInterval);
-            // Let ConfigNode create DataPartition
-            dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
-            for (int retry = 0; retry < 5; retry++) {
-              // Build new Client since it's unstable
-              try (SyncConfigNodeIServiceClient configNodeClient =
-                  (SyncConfigNodeIServiceClient)
-                      EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-                dataPartitionTableResp =
-                    
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
-                if (dataPartitionTableResp != null
-                    && dataPartitionTableResp.getStatus().getCode()
-                        == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                  ConfigNodeTestUtils.checkDataPartitionTable(
-                      storageGroup,
-                      j,
-                      j + seriesPartitionBatchSize,
-                      k,
-                      k + timePartitionBatchSize,
-                      testTimePartitionInterval,
-                      configNodeClient
-                          .getDataPartitionTable(dataPartitionReq)
-                          .getDataPartitionTable());
-                  break;
-                }
-              } catch (Exception e) {
-                // Retry sometimes in order to avoid request timeout
-                LOGGER.error(e.getMessage());
-                TimeUnit.SECONDS.sleep(1);
+    // Test1: divide and inherit DataPartitions from scratch
+    for (long timePartitionSlot = baseStartTime;
+        timePartitionSlot < baseStartTime + testTimePartitionSlotsNum;
+        timePartitionSlot++) {
+      for (int seriesPartitionSlot = 0;
+          seriesPartitionSlot < testSeriesSlotNum;
+          seriesPartitionSlot += seriesPartitionSlotBatchSize) {
+        ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+            database,
+            seriesPartitionSlot,
+            seriesPartitionSlot + seriesPartitionSlotBatchSize,
+            timePartitionSlot,
+            timePartitionSlot + 1,
+            testTimePartitionInterval);
+      }
+    }
+
+    int mu = (int) (testSeriesSlotNum / testDataRegionPerDataNode);
+    TDataPartitionTableResp dataPartitionTableResp =
+        ConfigNodeTestUtils.getDataPartitionWithRetry(
+            database,
+            0,
+            testSeriesSlotNum,
+            baseStartTime,
+            baseStartTime + testTimePartitionSlotsNum,
+            testTimePartitionInterval);
+    Assert.assertNotNull(dataPartitionTableResp);
+
+    // All DataRegionGroups divide all SeriesSlots evenly
+    final int expectedPartitionNum1 = mu * testTimePartitionSlotsNum;
+    Map<TConsensusGroupId, Integer> counter =
+        ConfigNodeTestUtils.countDataPartition(
+            dataPartitionTableResp.getDataPartitionTable().get(database));
+    counter.forEach((groupId, num) -> 
Assert.assertEquals(expectedPartitionNum1, num.intValue()));
+
+    // Test DataPartition inherit policy
+    dataPartitionTableResp
+        .getDataPartitionTable()
+        .get(database)
+        .forEach(
+            ((seriesPartitionSlot, timePartitionSlotMap) -> {
+              // All Timeslots belonging to the same SeriesSlot are allocated 
to the same
+              // DataRegionGroup
+              TConsensusGroupId groupId =
+                  timePartitionSlotMap
+                      .get(new TTimePartitionSlot(baseStartTime * 
testTimePartitionInterval))
+                      .get(0);
+              timePartitionSlotMap.forEach(
+                  (timePartitionSlot, groupIdList) ->
+                      Assert.assertEquals(groupId, groupIdList.get(0)));
+              dataAllotTable1.put(seriesPartitionSlot, groupId);
+            }));
+
+    // Register a new DataNode to extend DataRegionGroups
+    EnvFactory.getEnv().registerNewDataNode(true);
+
+    // Test2: divide and inherit DataPartitions after extension
+    mu = (int) (testSeriesSlotNum / (testDataRegionPerDataNode * 2));
+    dataPartitionTableResp =
+        ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+            database,
+            0,
+            testSeriesSlotNum,
+            baseStartTime + testTimePartitionSlotsNum,
+            baseStartTime + testTimePartitionSlotsNum + timePartitionBatchSize,
+            testTimePartitionInterval);
+    Assert.assertNotNull(dataPartitionTableResp);
+
+    // All DataRegionGroups divide all SeriesSlots evenly
+    counter =
+        ConfigNodeTestUtils.countDataPartition(
+            dataPartitionTableResp.getDataPartitionTable().get(database));
+    final int expectedPartitionNum2 = mu * timePartitionBatchSize;
+    counter.forEach((groupId, num) -> 
Assert.assertEquals(expectedPartitionNum2, num.intValue()));
+
+    // Test DataPartition inherit policy
+    AtomicInteger inheritedSeriesSlotNum = new AtomicInteger(0);
+    Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable2 = new 
ConcurrentHashMap<>();
+    dataPartitionTableResp
+        .getDataPartitionTable()
+        .get(database)
+        .forEach(
+            ((seriesPartitionSlot, timePartitionSlotMap) -> {
+              // All Timeslots belonging to the same SeriesSlot are allocated 
to the same
+              // DataRegionGroup
+              TConsensusGroupId groupId =
+                  timePartitionSlotMap
+                      .get(
+                          new TTimePartitionSlot(
+                              (baseStartTime + testTimePartitionSlotsNum)
+                                  * testTimePartitionInterval))
+                      .get(0);
+              timePartitionSlotMap.forEach(
+                  (timePartitionSlot, groupIdList) ->
+                      Assert.assertEquals(groupId, groupIdList.get(0)));
+
+              if (dataAllotTable1.containsValue(groupId)) {
+                // The DataRegionGroup has been inherited
+                
Assert.assertTrue(dataAllotTable1.containsKey(seriesPartitionSlot));
+                Assert.assertEquals(dataAllotTable1.get(seriesPartitionSlot), 
groupId);
+                inheritedSeriesSlotNum.incrementAndGet();
               }
-            }
-            k += step;
-          }
-        }
+              dataAllotTable2.put(seriesPartitionSlot, groupId);
+            }));
+    // Exactly half of the SeriesSlots are inherited
+    Assert.assertEquals(testSeriesSlotNum / 2, inheritedSeriesSlotNum.get());
+
+    // Test3: historical DataPartitions will inherit successor
+    Random random = new Random();
+    Set<Integer> allocatedSlots = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      int slot = random.nextInt(testSeriesSlotNum);
+      while (allocatedSlots.contains(slot)) {
+        slot = random.nextInt(testSeriesSlotNum);
+      }
+      allocatedSlots.add(slot);
+      dataPartitionTableResp =
+          ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+              database,
+              slot,
+              slot + 1,
+              baseStartTime - 1,
+              baseStartTime,
+              testTimePartitionInterval);
+      Assert.assertNotNull(dataPartitionTableResp);
+
+      TSeriesPartitionSlot seriesPartitionSlot = new 
TSeriesPartitionSlot(slot);
+      TTimePartitionSlot timePartitionSlot =
+          new TTimePartitionSlot((baseStartTime - 1) * 
testTimePartitionInterval);
+      Assert.assertEquals(
+          dataAllotTable1.get(seriesPartitionSlot),
+          dataPartitionTableResp
+              .getDataPartitionTable()
+              .get(database)
+              .get(seriesPartitionSlot)
+              .get(timePartitionSlot)
+              .get(0));
+    }
+
+    // Test4: future DataPartitions will inherit predecessor
+    allocatedSlots.clear();
+    for (int i = 0; i < 10; i++) {
+      int slot = random.nextInt(testSeriesSlotNum);
+      while (allocatedSlots.contains(slot)) {
+        slot = random.nextInt(testSeriesSlotNum);
       }
+      allocatedSlots.add(slot);
+      dataPartitionTableResp =
+          ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+              database,
+              slot,
+              slot + 1,
+              baseStartTime + 999,
+              baseStartTime + 1000,
+              testTimePartitionInterval);
+      Assert.assertNotNull(dataPartitionTableResp);
 
-      // Test DataPartition inherit policy
-      TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
-      showRegionResp
-          .getRegionInfoList()
-          .forEach(
-              regionInfo -> {
-                // All Timeslots belonging to the same SeriesSlot are 
allocated to the same
-                // DataRegionGroup
-                Assert.assertEquals(
-                    regionInfo.getSeriesSlots() * testTimePartitionSlotsNum,
-                    regionInfo.getTimeSlots());
-              });
+      TSeriesPartitionSlot seriesPartitionSlot = new 
TSeriesPartitionSlot(slot);
+      TTimePartitionSlot timePartitionSlot =
+          new TTimePartitionSlot((baseStartTime + 999) * 
testTimePartitionInterval);
+      Assert.assertEquals(
+          dataAllotTable2.get(seriesPartitionSlot),
+          dataPartitionTableResp
+              .getDataPartitionTable()
+              .get(database)
+              .get(seriesPartitionSlot)
+              .get(timePartitionSlot)
+              .get(0));
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 6f20a25ba79..8f5b46288fb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
@@ -34,12 +35,18 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
 import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -47,11 +54,17 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertTrue;
 
 public class ConfigNodeTestUtils {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigNodeTestUtils.class);
+
   public static void checkNodeConfig(
       List<TConfigNodeLocation> configNodeList,
       List<TDataNodeLocation> dataNodeList,
@@ -169,6 +182,110 @@ public class ConfigNodeTestUtils {
     }
   }
 
+  public static TDataPartitionTableResp getDataPartitionWithRetry(
+      String database,
+      int seriesSlotStart,
+      int seriesSlotEnd,
+      long timeSlotStart,
+      long timeSlotEnd,
+      long timePartitionInterval)
+      throws InterruptedException {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+        ConfigNodeTestUtils.constructPartitionSlotsMap(
+            database,
+            seriesSlotStart,
+            seriesSlotEnd,
+            timeSlotStart,
+            timeSlotEnd,
+            timePartitionInterval);
+    TDataPartitionTableResp dataPartitionTableResp;
+    TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+
+    for (int retry = 0; retry < 5; retry++) {
+      // Build new Client since it's unstable
+      try (SyncConfigNodeIServiceClient configNodeClient =
+          (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        dataPartitionTableResp = 
configNodeClient.getDataPartitionTable(dataPartitionReq);
+        if (dataPartitionTableResp != null
+            && dataPartitionTableResp.getStatus().getCode()
+                == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return dataPartitionTableResp;
+        }
+      } catch (Exception e) {
+        // Retry sometimes in order to avoid request timeout
+        LOGGER.error(e.getMessage());
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+    Assert.fail("Failed to create DataPartition");
+    return null;
+  }
+
+  public static TDataPartitionTableResp getOrCreateDataPartitionWithRetry(
+      String database,
+      int seriesSlotStart,
+      int seriesSlotEnd,
+      long timeSlotStart,
+      long timeSlotEnd,
+      long timePartitionInterval)
+      throws InterruptedException {
+
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+        ConfigNodeTestUtils.constructPartitionSlotsMap(
+            database,
+            seriesSlotStart,
+            seriesSlotEnd,
+            timeSlotStart,
+            timeSlotEnd,
+            timePartitionInterval);
+    TDataPartitionTableResp dataPartitionTableResp;
+    TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+
+    for (int retry = 0; retry < 5; retry++) {
+      // Build new Client since it's unstable
+      try (SyncConfigNodeIServiceClient configNodeClient =
+          (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+        if (dataPartitionTableResp != null
+            && dataPartitionTableResp.getStatus().getCode()
+                == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          ConfigNodeTestUtils.checkDataPartitionTable(
+              database,
+              seriesSlotStart,
+              seriesSlotEnd,
+              timeSlotStart,
+              timeSlotEnd,
+              timePartitionInterval,
+              
configNodeClient.getDataPartitionTable(dataPartitionReq).getDataPartitionTable());
+          return dataPartitionTableResp;
+        }
+      } catch (Exception e) {
+        // Retry sometimes in order to avoid request timeout
+        LOGGER.error(e.getMessage());
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+    Assert.fail("Failed to create DataPartition");
+    return null;
+  }
+
+  public static Map<TConsensusGroupId, Integer> countDataPartition(
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>
+          dataPartitionMap) {
+    Map<TConsensusGroupId, AtomicInteger> counter = new ConcurrentHashMap<>();
+    dataPartitionMap.forEach(
+        ((seriesPartitionSlot, timePartitionSlotMap) ->
+            timePartitionSlotMap.forEach(
+                ((timePartitionSlot, consensusGroupIds) ->
+                    consensusGroupIds.forEach(
+                        (consensusGroupId ->
+                            counter
+                                .computeIfAbsent(consensusGroupId, empty -> 
new AtomicInteger(0))
+                                .incrementAndGet()))))));
+    return counter.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().get()));
+  }
+
   public static TConfigNodeLocation generateTConfigNodeLocation(
       int nodeId, ConfigNodeWrapper configNodeWrapper) {
     return new TConfigNodeLocation(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
index 33f9ef9e151..6d30db00085 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
@@ -119,7 +119,8 @@ public class IoTDBLastQueryLastCacheIT {
           "1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,",
           "1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
         };
-    resultSetEqualTest("select last * from root.ln_1.tb_6141;", 
expectedHeader, retArray);
+    resultSetEqualTest(
+        "select last * from root.ln_1.tb_6141 order by timeseries;", 
expectedHeader, retArray);
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
index 454879aea90..82db0fdf154 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java
@@ -1318,7 +1318,7 @@ public class IoTDBOrderByIT {
           {"15", "15", "3147483648", "3147483648"},
           {"INT32", "INT32", "INT64", "INT64"}
         };
-    String sql = "select last bigNum,num from root.** order by value";
+    String sql = "select last bigNum,num from root.** order by value, 
timeseries";
     testLastQueryOrderBy(sql, ans);
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
index b71005bc0d6..66cd2d6905b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
@@ -471,18 +471,22 @@ public class IoTDBSessionSchemaTemplateIT extends 
AbstractSchemaIT {
 
     Assert.assertTrue(expectedSeries.isEmpty());
 
-    deviceIds = Arrays.asList("root.db.v4.d1", "root.db.v4.d2");
-    timestamps = Arrays.asList(1L, 1L);
     measurements = Arrays.asList("a", "b", "c");
-    allMeasurements = Arrays.asList(measurements, measurements);
-    allTsDataTypes =
-        Arrays.asList(
-            Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.TEXT),
-            Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE, 
TSDataType.INT32));
-    allValues = Arrays.asList(Arrays.asList(1f, 2f, "3"), Arrays.asList(1d, 
2d, 3));
 
     try {
-      session.insertRecords(deviceIds, timestamps, allMeasurements, 
allTsDataTypes, allValues);
+      session.insertRecord(
+          "root.db.v4.d1",
+          1L,
+          measurements,
+          Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.TEXT),
+          Arrays.asList(1f, 2f, "3"));
+      session.insertRecord(
+          "root.db.v4.d2",
+          1L,
+          measurements,
+          Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE, 
TSDataType.INT32),
+          Arrays.asList(1d, 2d, 3));
+      Assert.fail();
     } catch (StatementExecutionException e) {
       Assert.assertTrue(
           e.getMessage()
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 2907c3b1eff..6b99e0da369 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -110,12 +110,6 @@ public class ConfigNodeConfig {
   private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy =
       RegionBalancer.RegionGroupAllocatePolicy.GREEDY;
 
-  /**
-   * DataPartition within the same SeriesPartitionSlot will inherit the 
allocation result of the
-   * predecessor or successor TimePartitionSlot if set true.
-   */
-  private boolean enableDataPartitionInheritPolicy = true;
-
   /** Max concurrent client number. */
   private int rpcMaxConcurrentClientNum = 65535;
 
@@ -559,14 +553,6 @@ public class ConfigNodeConfig {
     this.regionGroupAllocatePolicy = regionGroupAllocatePolicy;
   }
 
-  public boolean isEnableDataPartitionInheritPolicy() {
-    return enableDataPartitionInheritPolicy;
-  }
-
-  public void setEnableDataPartitionInheritPolicy(boolean 
enableDataPartitionInheritPolicy) {
-    this.enableDataPartitionInheritPolicy = enableDataPartitionInheritPolicy;
-  }
-
   public int getThriftServerAwaitTimeForStopService() {
     return thriftServerAwaitTimeForStopService;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 59eab23ddc4..5490b700c36 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -272,12 +272,6 @@ public class ConfigNodeDescriptor {
           "The configured region allocate strategy does not exist, use the 
default: GREEDY!");
     }
 
-    conf.setEnableDataPartitionInheritPolicy(
-        Boolean.parseBoolean(
-            properties.getProperty(
-                "enable_data_partition_inherit_policy",
-                String.valueOf(conf.isEnableDataPartitionInheritPolicy()))));
-
     conf.setCnRpcAdvancedCompressionEnable(
         Boolean.parseBoolean(
             properties
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index d88ebd70fe4..b4c291c97e9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -94,7 +94,7 @@ public class LoadManager {
   /**
    * Generate an optimal CreateRegionGroupsPlan.
    *
-   * @param allotmentMap Map<StorageGroupName, Region allotment>
+   * @param allotmentMap Map<DatabaseName, Region allotment>
    * @param consensusGroupType TConsensusGroupType of RegionGroup to be 
allocated
    * @return CreateRegionGroupsPlan
    * @throws NotEnoughDataNodeException If there are not enough DataNodes
@@ -110,7 +110,7 @@ public class LoadManager {
    * Allocate SchemaPartitions.
    *
    * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should 
be assigned
-   * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
+   * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
       Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
@@ -122,7 +122,7 @@ public class LoadManager {
    * Allocate DataPartitions.
    *
    * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be 
assigned
-   * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
+   * @return Map<DatabaseName, DataPartitionTable>, the allocating result
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
@@ -130,6 +130,10 @@ public class LoadManager {
     return 
partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
   }
 
+  public void reBalanceDataPartitionPolicy(String database) {
+    partitionBalancer.reBalanceDataPartitionPolicy(database);
+  }
+
   public void broadcastLatestRegionRouteMap() {
     statisticsService.broadcastLatestRegionRouteMap();
   }
@@ -138,12 +142,18 @@ public class LoadManager {
     loadCache.initHeartbeatCache(configManager);
     heartbeatService.startHeartbeatService();
     statisticsService.startLoadStatisticsService();
+    partitionBalancer.setupPartitionBalancer();
   }
 
   public void stopLoadServices() {
     heartbeatService.stopHeartbeatService();
     statisticsService.stopLoadStatisticsService();
     loadCache.clearHeartbeatCache();
+    partitionBalancer.clearPartitionBalancer();
+  }
+
+  public void clearDataPartitionPolicyTable(String database) {
+    partitionBalancer.clearDataPartitionPolicyTable(database);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 89d9235911c..6bfa9ae0671 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -16,19 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.manager.load.balancer;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
 import org.apache.iotdb.confignode.manager.IManager;
-import 
org.apache.iotdb.confignode.manager.load.balancer.partition.GreedyPartitionAllocator;
-import 
org.apache.iotdb.confignode.manager.load.balancer.partition.IPartitionAllocator;
+import 
org.apache.iotdb.confignode.manager.load.balancer.partition.DataPartitionPolicyTable;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The SeriesPartitionSlotBalancer provides interfaces to generate optimal 
Partition allocation and
@@ -36,38 +51,206 @@ import java.util.Map;
  */
 public class PartitionBalancer {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionBalancer.class);
+
   private final IManager configManager;
 
+  // Map<DatabaseName, DataPartitionPolicyTable>
+  private final Map<String, DataPartitionPolicyTable> 
dataPartitionPolicyTableMap;
+
   public PartitionBalancer(IManager configManager) {
     this.configManager = configManager;
+    this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>();
   }
 
   /**
    * Allocate SchemaPartitions
    *
    * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should 
be assigned
-   * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
+   * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
       Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
       throws NoAvailableRegionGroupException {
-    return 
genPartitionAllocator().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+    Map<String, SchemaPartitionTable> result = new HashMap<>();
+
+    for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
+        unassignedSchemaPartitionSlotsMap.entrySet()) {
+      final String database = slotsMapEntry.getKey();
+      final List<TSeriesPartitionSlot> unassignedPartitionSlots = 
slotsMapEntry.getValue();
+
+      // Filter available SchemaRegionGroups and
+      // sort them by the number of allocated SchemaPartitions
+      BalanceTreeMap<TConsensusGroupId, Integer> counter = new 
BalanceTreeMap<>();
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          getPartitionManager()
+              .getSortedRegionGroupSlotsCounter(database, 
TConsensusGroupType.SchemaRegion);
+      for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
+        counter.put(pair.getRight(), pair.getLeft().intValue());
+      }
+
+      // Enumerate SeriesPartitionSlot
+      Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new 
HashMap<>();
+      for (TSeriesPartitionSlot seriesPartitionSlot : 
unassignedPartitionSlots) {
+        // Greedy allocation: allocate the unassigned SchemaPartition to
+        // the RegionGroup whose allocated SchemaPartitions is the least
+        TConsensusGroupId consensusGroupId = counter.getKeyWithMinValue();
+        schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId);
+        counter.put(consensusGroupId, counter.get(consensusGroupId) + 1);
+      }
+      result.put(database, new SchemaPartitionTable(schemaPartitionMap));
+    }
+
+    return result;
   }
 
   /**
    * Allocate DataPartitions
    *
    * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be 
assigned
-   * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
+   * @return Map<DatabaseName, DataPartitionTable>, the allocating result
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
       throws NoAvailableRegionGroupException {
-    return 
genPartitionAllocator().allocateDataPartition(unassignedDataPartitionSlotsMap);
+    Map<String, DataPartitionTable> result = new HashMap<>();
+
+    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
slotsMapEntry :
+        unassignedDataPartitionSlotsMap.entrySet()) {
+      final String database = slotsMapEntry.getKey();
+      final Map<TSeriesPartitionSlot, TTimeSlotList> 
unassignedPartitionSlotsMap =
+          slotsMapEntry.getValue();
+
+      // Filter available DataRegionGroups and
+      // sort them by the number of allocated DataPartitions
+      BalanceTreeMap<TConsensusGroupId, Integer> counter = new 
BalanceTreeMap<>();
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          getPartitionManager()
+              .getSortedRegionGroupSlotsCounter(database, 
TConsensusGroupType.DataRegion);
+      for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
+        counter.put(pair.getRight(), pair.getLeft().intValue());
+      }
+
+      DataPartitionTable dataPartitionTable = new DataPartitionTable();
+      DataPartitionPolicyTable allotTable = 
dataPartitionPolicyTableMap.get(database);
+      try {
+        allotTable.acquireLock();
+        // Enumerate SeriesPartitionSlot
+        for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> 
seriesPartitionEntry :
+            unassignedPartitionSlotsMap.entrySet()) {
+          SeriesPartitionTable seriesPartitionTable = new 
SeriesPartitionTable();
+
+          // Enumerate TimePartitionSlot in ascending order
+          TSeriesPartitionSlot seriesPartitionSlot = 
seriesPartitionEntry.getKey();
+          List<TTimePartitionSlot> timePartitionSlots =
+              seriesPartitionEntry.getValue().getTimePartitionSlots();
+          
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
+
+          for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+
+            // 1. The historical DataPartition will try to inherit successor 
DataPartition first
+            TConsensusGroupId successor =
+                getPartitionManager()
+                    .getSuccessorDataPartition(database, seriesPartitionSlot, 
timePartitionSlot);
+            if (successor != null && counter.containsKey(successor)) {
+              seriesPartitionTable.putDataPartition(timePartitionSlot, 
successor);
+              counter.put(successor, counter.get(successor) + 1);
+              continue;
+            }
+
+            // 2. Assign DataPartition base on the DataAllotTable
+            TConsensusGroupId allotGroupId =
+                
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+            if (counter.containsKey(allotGroupId)) {
+              seriesPartitionTable.putDataPartition(timePartitionSlot, 
allotGroupId);
+              counter.put(allotGroupId, counter.get(allotGroupId) + 1);
+              continue;
+            }
+
+            // 3. The allotDataRegionGroup is unavailable,
+            // try to inherit predecessor DataPartition
+            TConsensusGroupId predecessor =
+                getPartitionManager()
+                    .getPredecessorDataPartition(database, 
seriesPartitionSlot, timePartitionSlot);
+            if (predecessor != null && counter.containsKey(predecessor)) {
+              seriesPartitionTable.putDataPartition(timePartitionSlot, 
predecessor);
+              counter.put(predecessor, counter.get(predecessor) + 1);
+              continue;
+            }
+
+            // 4. Assign the DataPartition to DataRegionGroup with the least 
DataPartitions
+            // If the above DataRegionGroups are unavailable
+            TConsensusGroupId greedyGroupId = counter.getKeyWithMinValue();
+            seriesPartitionTable.putDataPartition(timePartitionSlot, 
greedyGroupId);
+            counter.put(greedyGroupId, counter.get(greedyGroupId) + 1);
+          }
+
+          dataPartitionTable
+              .getDataPartitionMap()
+              .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
+        }
+      } finally {
+        allotTable.releaseLock();
+      }
+      result.put(database, dataPartitionTable);
+    }
+
+    return result;
+  }
+
+  /**
+   * Re-balance the DataPartitionPolicyTable.
+   *
+   * @param database Database name
+   */
+  public void reBalanceDataPartitionPolicy(String database) {
+    try {
+      dataPartitionPolicyTableMap
+          .computeIfAbsent(database, empty -> new DataPartitionPolicyTable())
+          .reBalanceDataPartitionPolicy(
+              getPartitionManager().getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion));
+    } catch (DatabaseNotExistsException e) {
+      LOGGER.error("Database {} not exists when updateDataAllotTable", 
database);
+    }
+  }
+
+  /** Set up the PartitionBalancer when the current ConfigNode becomes leader. 
*/
+  public void setupPartitionBalancer() {
+    dataPartitionPolicyTableMap.clear();
+    getClusterSchemaManager()
+        .getDatabaseNames()
+        .forEach(
+            database -> {
+              DataPartitionPolicyTable dataPartitionPolicyTable = new 
DataPartitionPolicyTable();
+              dataPartitionPolicyTableMap.put(database, 
dataPartitionPolicyTable);
+              try {
+                // Put all DataRegionGroups into the DataPartitionPolicyTable
+                dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
+                    getPartitionManager()
+                        .getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion));
+                // Load the last DataAllotTable
+                dataPartitionPolicyTable.setDataAllotMap(
+                    getPartitionManager().getLastDataAllotTable(database));
+              } catch (DatabaseNotExistsException e) {
+                LOGGER.error("Database {} not exists when 
setupPartitionBalancer", database);
+              }
+            });
+  }
+
+  /** Clear the PartitionBalancer when the current ConfigNode is no longer the 
leader. */
+  public void clearPartitionBalancer() {
+    dataPartitionPolicyTableMap.clear();
+  }
+
+  public void clearDataPartitionPolicyTable(String database) {
+    dataPartitionPolicyTableMap.remove(database);
+  }
+
+  private ClusterSchemaManager getClusterSchemaManager() {
+    return configManager.getClusterSchemaManager();
   }
 
-  private IPartitionAllocator genPartitionAllocator() {
-    // TODO: The type of PartitionAllocator should be configurable
-    return new GreedyPartitionAllocator(configManager);
+  private PartitionManager getPartitionManager() {
+    return configManager.getPartitionManager();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
new file mode 100644
index 00000000000..29bd2a2c1ba
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DataPartitionPolicyTable {
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+  private final ReentrantLock dataAllotTableLock;
+
+  // Map<SeriesPartitionSlot, RegionGroupId>
+  // The optimal allocation of SeriesSlots to RegionGroups
+  private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap;
+
+  // Map<RegionGroupId, SeriesPartitionSlot Count>
+  // The number of SeriesSlots allocated to each RegionGroup in dataAllotMap
+  private final BalanceTreeMap<TConsensusGroupId, Integer> 
seriesPartitionSlotCounter;
+
+  public DataPartitionPolicyTable() {
+    this.dataAllotTableLock = new ReentrantLock();
+    this.dataAllotMap = new HashMap<>();
+    this.seriesPartitionSlotCounter = new BalanceTreeMap<>();
+  }
+
+  /**
+   * Get or activate the specified SeriesPartitionSlot in dataAllotMap.
+   *
+   * @param seriesPartitionSlot The specified SeriesPartitionSlot
+   * @return The RegionGroupId of the specified SeriesPartitionSlot, activate 
when its empty yet
+   */
+  public TConsensusGroupId getRegionGroupIdOrActivateIfNecessary(
+      TSeriesPartitionSlot seriesPartitionSlot) {
+    if (dataAllotMap.containsKey(seriesPartitionSlot)) {
+      return dataAllotMap.get(seriesPartitionSlot);
+    }
+
+    TConsensusGroupId regionGroupId = 
seriesPartitionSlotCounter.getKeyWithMinValue();
+    dataAllotMap.put(seriesPartitionSlot, regionGroupId);
+    seriesPartitionSlotCounter.put(
+        regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1);
+    return regionGroupId;
+  }
+
+  /**
+   * Re-balance the allocation of SeriesSlots to RegionGroups.
+   *
+   * @param dataRegionGroups All DataRegionGroups currently in the Database
+   */
+  public void reBalanceDataPartitionPolicy(List<TConsensusGroupId> 
dataRegionGroups) {
+    dataAllotTableLock.lock();
+    try {
+      dataRegionGroups.forEach(
+          dataRegionGroup -> {
+            if (!seriesPartitionSlotCounter.containsKey(dataRegionGroup)) {
+              seriesPartitionSlotCounter.put(dataRegionGroup, 0);
+            }
+          });
+
+      // Enumerate all SeriesSlots randomly
+      List<TSeriesPartitionSlot> seriesPartitionSlots = new ArrayList<>();
+      for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+        seriesPartitionSlots.add(new TSeriesPartitionSlot(i));
+      }
+      Collections.shuffle(seriesPartitionSlots);
+
+      int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
+      for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) {
+        if (!dataAllotMap.containsKey(seriesPartitionSlot)) {
+          continue;
+        }
+
+        TConsensusGroupId regionGroupId = 
dataAllotMap.get(seriesPartitionSlot);
+        int seriesPartitionSlotCount = 
seriesPartitionSlotCounter.get(regionGroupId);
+        if (seriesPartitionSlotCount > mu) {
+          // Remove from dataAllotMap if the number of SeriesSlots is greater 
than mu
+          dataAllotMap.remove(seriesPartitionSlot);
+          seriesPartitionSlotCounter.put(regionGroupId, 
seriesPartitionSlotCount - 1);
+        }
+      }
+    } finally {
+      dataAllotTableLock.unlock();
+    }
+  }
+
+  /** Only use this interface when init PartitionBalancer. */
+  public void setDataAllotMap(Map<TSeriesPartitionSlot, TConsensusGroupId> 
dataAllotMap) {
+    try {
+      dataAllotTableLock.lock();
+      int mu = SERIES_SLOT_NUM / seriesPartitionSlotCounter.size();
+      dataAllotMap.forEach(
+          (seriesPartitionSlot, regionGroupId) -> {
+            if (seriesPartitionSlotCounter.get(regionGroupId) < mu) {
+              // Put into dataAllotMap only when the number of SeriesSlots
+              // allocated to the RegionGroup is less than mu
+              this.dataAllotMap.put(seriesPartitionSlot, regionGroupId);
+              seriesPartitionSlotCounter.put(
+                  regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) 
+ 1);
+            }
+            // Otherwise, clear this SeriesPartitionSlot and wait for 
re-activating
+          });
+    } finally {
+      dataAllotTableLock.unlock();
+    }
+  }
+
+  public void acquireLock() {
+    dataAllotTableLock.lock();
+  }
+
+  public void releaseLock() {
+    dataAllotTableLock.unlock();
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
deleted file mode 100644
index aacf165a3ea..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.partition.DataPartitionTable;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.commons.partition.SeriesPartitionTable;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
-import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/** Allocating new Partitions by greedy algorithm. */
-public class GreedyPartitionAllocator implements IPartitionAllocator {
-
-  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
-  private static final boolean ENABLE_DATA_PARTITION_INHERIT_POLICY =
-      CONF.isEnableDataPartitionInheritPolicy();
-  private static final long TIME_PARTITION_INTERVAL =
-      CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
-
-  private final IManager configManager;
-
-  public GreedyPartitionAllocator(IManager configManager) {
-    this.configManager = configManager;
-  }
-
-  @Override
-  public Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
-      throws NoAvailableRegionGroupException {
-    Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>();
-
-    for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
-        unassignedSchemaPartitionSlotsMap.entrySet()) {
-      final String storageGroup = slotsMapEntry.getKey();
-      final List<TSeriesPartitionSlot> unassignedPartitionSlots = 
slotsMapEntry.getValue();
-
-      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
-      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
-          getPartitionManager()
-              .getSortedRegionGroupSlotsCounter(storageGroup, 
TConsensusGroupType.SchemaRegion);
-
-      // Enumerate SeriesPartitionSlot
-      Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new 
ConcurrentHashMap<>();
-      for (TSeriesPartitionSlot seriesPartitionSlot : 
unassignedPartitionSlots) {
-        // Greedy allocation
-        schemaPartitionMap.put(seriesPartitionSlot, 
regionSlotsCounter.get(0).getRight());
-        // Bubble sort
-        bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
-      }
-      result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
-    }
-
-    return result;
-  }
-
-  @Override
-  public Map<String, DataPartitionTable> allocateDataPartition(
-      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
-      throws NoAvailableRegionGroupException {
-    Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
-
-    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
slotsMapEntry :
-        unassignedDataPartitionSlotsMap.entrySet()) {
-      final String database = slotsMapEntry.getKey();
-      final Map<TSeriesPartitionSlot, TTimeSlotList> 
unassignedPartitionSlotsMap =
-          slotsMapEntry.getValue();
-
-      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
-      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
-          getPartitionManager()
-              .getSortedRegionGroupSlotsCounter(database, 
TConsensusGroupType.DataRegion);
-
-      DataPartitionTable dataPartitionTable = new DataPartitionTable();
-
-      // Enumerate SeriesPartitionSlot
-      for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry 
:
-          unassignedPartitionSlotsMap.entrySet()) {
-        SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
-
-        // Enumerate TimePartitionSlot in ascending order
-        List<TTimePartitionSlot> timePartitionSlots =
-            seriesPartitionEntry.getValue().getTimePartitionSlots();
-        
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
-        for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
-
-          /* 1. Inherit policy */
-          if (ENABLE_DATA_PARTITION_INHERIT_POLICY) {
-            // Check if the current Partition's neighbor(predecessor or 
successor)
-            // is allocated in the same batch of Partition creation
-            TConsensusGroupId neighbor =
-                seriesPartitionTable.getAdjacentDataPartition(
-                    timePartitionSlot, TIME_PARTITION_INTERVAL);
-            if (neighbor != null) {
-              seriesPartitionTable
-                  .getSeriesPartitionMap()
-                  .put(timePartitionSlot, Collections.singletonList(neighbor));
-              bubbleSort(neighbor, regionSlotsCounter);
-              continue;
-            }
-
-            // Check if the current Partition's neighbor(predecessor or 
successor)
-            // was allocated in the former Partition creation
-            neighbor =
-                getPartitionManager()
-                    .getAdjacentDataPartition(
-                        database,
-                        seriesPartitionEntry.getKey(),
-                        timePartitionSlot,
-                        TIME_PARTITION_INTERVAL);
-            if (neighbor != null) {
-              seriesPartitionTable
-                  .getSeriesPartitionMap()
-                  .put(timePartitionSlot, Collections.singletonList(neighbor));
-              bubbleSort(neighbor, regionSlotsCounter);
-              continue;
-            }
-          }
-
-          /* 2. Greedy policy */
-          seriesPartitionTable
-              .getSeriesPartitionMap()
-              .put(
-                  timePartitionSlot,
-                  
Collections.singletonList(regionSlotsCounter.get(0).getRight()));
-          bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
-        }
-        dataPartitionTable
-            .getDataPartitionMap()
-            .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
-      }
-      result.put(database, dataPartitionTable);
-    }
-
-    return result;
-  }
-
-  /**
-   * Bubble sort the regionSlotsCounter from the specified consensus group
-   *
-   * <p>Notice: Here we use bubble sort instead of other sorting algorithm is 
because that, there is
-   * only one Partition allocated in each loop. Therefore, only consider one 
consensus group weight
-   * change is enough
-   *
-   * @param consensusGroupId The consensus group where the new Partition is 
allocated
-   * @param regionSlotsCounter List<Pair<Allocated Partition num, 
TConsensusGroupId>>
-   */
-  private void bubbleSort(
-      TConsensusGroupId consensusGroupId, List<Pair<Long, TConsensusGroupId>> 
regionSlotsCounter) {
-    // Find the corresponding consensus group
-    int index = 0;
-    for (int i = 0; i < regionSlotsCounter.size(); i++) {
-      if (regionSlotsCounter.get(i).getRight().equals(consensusGroupId)) {
-        index = i;
-        break;
-      }
-    }
-
-    // Do bubble sort
-    
regionSlotsCounter.get(index).setLeft(regionSlotsCounter.get(index).getLeft() + 
1);
-    while (index < regionSlotsCounter.size() - 1
-        && regionSlotsCounter.get(index).getLeft() > 
regionSlotsCounter.get(index + 1).getLeft()) {
-      Collections.swap(regionSlotsCounter, index, index + 1);
-      index += 1;
-    }
-  }
-
-  private PartitionManager getPartitionManager() {
-    return configManager.getPartitionManager();
-  }
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
deleted file mode 100644
index 62ef53d9655..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartitionTable;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The IPartitionAllocator is a functional interface, which means a new 
functional class who
- * implements the IPartitionAllocator must be created for each Partition 
allocation.
- */
-public interface IPartitionAllocator {
-
-  /**
-   * Allocate SchemaPartitions
-   *
-   * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should 
be assigned
-   * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
-   */
-  Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
-      throws NoAvailableRegionGroupException;
-
-  /**
-   * Allocate DataPartitions
-   *
-   * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be 
assigned
-   * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
-   */
-  Map<String, DataPartitionTable> allocateDataPartition(
-      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
-      throws NoAvailableRegionGroupException;
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index fae18091d36..ac0df2916a9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
@@ -85,6 +86,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -265,14 +267,13 @@ public class PartitionManager {
       // Cache allocating result only if the current ConfigNode still holds 
its leadership
       CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
       createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
-      status = getConsensusManager().confirmLeader();
+
+      status = consensusWritePartitionResult(createPlan);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        // Here we check the leadership second time
-        // since the RegionGroup creating process might take some time
+        // The allocation might fail due to consensus error
         resp.setStatus(status);
         return resp;
       }
-      getConsensusManager().write(createPlan);
     }
 
     resp = (SchemaPartitionResp) getSchemaPartition(req);
@@ -389,14 +390,13 @@ public class PartitionManager {
       // Cache allocating result only if the current ConfigNode still holds 
its leadership
       CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
       createPlan.setAssignedDataPartition(assignedDataPartition);
-      status = getConsensusManager().confirmLeader();
+
+      status = consensusWritePartitionResult(createPlan);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        // Here we check the leadership second time
-        // since the RegionGroup creating process might take some time
+        // The allocation might fail due to consensus error
         resp.setStatus(status);
         return resp;
       }
-      getConsensusManager().write(createPlan);
     }
 
     resp = (DataPartitionResp) getDataPartition(req);
@@ -431,6 +431,24 @@ public class PartitionManager {
     return resp;
   }
 
+  private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
+    TSStatus status = getConsensusManager().confirmLeader();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      // Here we check the leadership second time
+      // since the RegionGroup creating process might take some time
+      return status;
+    }
+
+    ConsensusWriteResponse writeResp = getConsensusManager().write(plan);
+    if (!writeResp.isSuccessful()) {
+      // The allocation might fail due to consensus error
+      status = writeResp.getStatus();
+      status.setMessage(writeResp.getErrorMessage());
+      LOGGER.error("Write DataPartition allocation result failed because: {}", 
status);
+    }
+    return status;
+  }
+
   // ======================================================
   // Leader scheduling interfaces
   // ======================================================
@@ -590,22 +608,37 @@ public class PartitionManager {
   }
 
   /**
-   * Only leader use this interface. Checks whether the specified 
DataPartition has a predecessor or
-   * successor and returns if it does
+   * Only leader use this interface. Checks whether the specified 
DataPartition has a successor and
+   * returns if it does.
+   *
+   * @param database DatabaseName
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's successor if exists, null otherwise
+   */
+  public TConsensusGroupId getSuccessorDataPartition(
+      String database,
+      TSeriesPartitionSlot seriesPartitionSlot,
+      TTimePartitionSlot timePartitionSlot) {
+    return partitionInfo.getSuccessorDataPartition(
+        database, seriesPartitionSlot, timePartitionSlot);
+  }
+
+  /**
+   * Only leader use this interface. Checks whether the specified 
DataPartition has a predecessor
+   * and returns if it does.
    *
    * @param database DatabaseName
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
+  public TConsensusGroupId getPredecessorDataPartition(
       String database,
       TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
-    return partitionInfo.getAdjacentDataPartition(
-        database, seriesPartitionSlot, timePartitionSlot, 
timePartitionInterval);
+      TTimePartitionSlot timePartitionSlot) {
+    return partitionInfo.getPredecessorDataPartition(
+        database, seriesPartitionSlot, timePartitionSlot);
   }
 
   /**
@@ -711,6 +744,21 @@ public class PartitionManager {
     return partitionInfo.getRegionGroupCount(database, type);
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get all the RegionGroups currently owned by the specified Database
+   *
+   * @param database DatabaseName
+   * @param type SchemaRegion or DataRegion
+   * @return List of TConsensusGroupId
+   * @throws DatabaseNotExistsException When the specified Database doesn't 
exist
+   */
+  public List<TConsensusGroupId> getAllRegionGroupIds(String database, 
TConsensusGroupType type)
+      throws DatabaseNotExistsException {
+    return partitionInfo.getAllRegionGroupIds(database, type);
+  }
+
   /**
    * Check if the specified Database exists.
    *
@@ -1249,6 +1297,16 @@ public class PartitionManager {
     partitionInfo.getDataRegionIds(databases, dataRegionIds);
   }
 
+  /**
+   * Get the last DataAllotTable of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> 
getLastDataAllotTable(String database) {
+    return partitionInfo.getLastDataAllotTable(database);
+  }
+
   public ScheduledExecutorService getRegionMaintainer() {
     return regionMaintainer;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index b2ca2faf6b7..a629569470a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -98,19 +98,6 @@ public class DatabasePartitionTable {
                 replicaSet.getRegionId(), new 
RegionGroup(System.currentTimeMillis(), replicaSet)));
   }
 
-  /**
-   * Delete RegionGroups' cache.
-   *
-   * @param replicaSets List<TRegionReplicaSet>
-   */
-  public void deleteRegionGroups(List<TRegionReplicaSet> replicaSets) {
-    replicaSets.forEach(replicaSet -> 
regionGroupMap.remove(replicaSet.getRegionId()));
-  }
-
-  public Set<TConsensusGroupId> getAllConsensusGroupId() {
-    return regionGroupMap.keySet();
-  }
-
   /** @return Deep copy of all Regions' RegionReplicaSet within one 
StorageGroup */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     List<TRegionReplicaSet> result = new ArrayList<>();
@@ -220,6 +207,20 @@ public class DatabasePartitionTable {
     return result.getAndIncrement();
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get all the RegionGroups currently owned by the specified Database
+   *
+   * @param type SchemaRegion or DataRegion
+   * @return List of TConsensusGroupId
+   */
+  public List<TConsensusGroupId> getAllRegionGroupIds(TConsensusGroupType 
type) {
+    return regionGroupMap.keySet().stream()
+        .filter(regionGroupId -> regionGroupId.getType().equals(type))
+        .collect(Collectors.toList());
+  }
+
   public int getAssignedSeriesPartitionSlotsCount() {
     return Math.max(
         schemaPartitionTable.getSchemaPartitionMap().size(),
@@ -250,20 +251,28 @@ public class DatabasePartitionTable {
     return dataPartitionTable.getDataPartition(partitionSlots, dataPartition);
   }
 
+  /**
+   * Checks whether the specified DataPartition has a successor and returns if 
it does.
+   *
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's successor if exists, null otherwise
+   */
+  public TConsensusGroupId getSuccessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
+    return dataPartitionTable.getSuccessorDataPartition(seriesPartitionSlot, 
timePartitionSlot);
+  }
+
   /**
    * Checks whether the specified DataPartition has a predecessor and returns 
if it does.
    *
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
-      TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
-    return dataPartitionTable.getAdjacentDataPartition(
-        seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+  public TConsensusGroupId getPredecessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
+    return dataPartitionTable.getPredecessorDataPartition(seriesPartitionSlot, 
timePartitionSlot);
   }
 
   /**
@@ -553,6 +562,15 @@ public class DatabasePartitionTable {
     return dataRegionIds;
   }
 
+  /**
+   * Get the last DataAllotTable.
+   *
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
+    return dataPartitionTable.getLastDataAllotTable();
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index d85c63e4e10..d61396a4c7a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -385,24 +385,42 @@ public class PartitionInfo implements SnapshotProcessor {
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor or successor 
and returns if it
-   * does.
+   * Checks whether the specified DataPartition has a successor and returns if 
it does.
+   *
+   * @param database DatabaseName
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's successor if exists, null otherwise
+   */
+  public TConsensusGroupId getSuccessorDataPartition(
+      String database,
+      TSeriesPartitionSlot seriesPartitionSlot,
+      TTimePartitionSlot timePartitionSlot) {
+    if (isDatabaseExisted(database)) {
+      return databasePartitionTables
+          .get(database)
+          .getSuccessorDataPartition(seriesPartitionSlot, timePartitionSlot);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Checks whether the specified DataPartition has a predecessor and returns 
if it does.
    *
    * @param database DatabaseName
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
+  public TConsensusGroupId getPredecessorDataPartition(
       String database,
       TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
-    if (databasePartitionTables.containsKey(database)) {
+      TTimePartitionSlot timePartitionSlot) {
+    if (isDatabaseExisted(database)) {
       return databasePartitionTables
           .get(database)
-          .getAdjacentDataPartition(seriesPartitionSlot, timePartitionSlot, 
timePartitionInterval);
+          .getPredecessorDataPartition(seriesPartitionSlot, timePartitionSlot);
     } else {
       return null;
     }
@@ -722,6 +740,25 @@ public class PartitionInfo implements SnapshotProcessor {
     return databasePartitionTables.get(database).getRegionGroupCount(type);
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get all the RegionGroups currently owned by the specified Database
+   *
+   * @param database DatabaseName
+   * @param type SchemaRegion or DataRegion
+   * @return List of TConsensusGroupId
+   * @throws DatabaseNotExistsException When the specified Database doesn't 
exist
+   */
+  public List<TConsensusGroupId> getAllRegionGroupIds(String database, 
TConsensusGroupType type)
+      throws DatabaseNotExistsException {
+    if (!isDatabaseExisted(database)) {
+      throw new DatabaseNotExistsException(database);
+    }
+
+    return databasePartitionTables.get(database).getAllRegionGroupIds(type);
+  }
+
   /**
    * Only leader use this interface.
    *
@@ -761,14 +798,14 @@ public class PartitionInfo implements SnapshotProcessor {
   /**
    * Only leader use this interface.
    *
-   * @param storageGroup StorageGroupName
+   * @param database DatabaseName
    * @param type SchemaRegion or DataRegion
    * @return The StorageGroup's Running or Available Regions that sorted by 
the number of allocated
    *     slots
    */
   public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(
-      String storageGroup, TConsensusGroupType type) {
-    return 
databasePartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type);
+      String database, TConsensusGroupType type) {
+    return 
databasePartitionTables.get(database).getRegionGroupSlotsCounter(type);
   }
 
   /**
@@ -784,6 +821,19 @@ public class PartitionInfo implements SnapshotProcessor {
     return schemaPartitionSet;
   }
 
+  /**
+   * Get the last DataAllotTable of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> 
getLastDataAllotTable(String database) {
+    if (isDatabaseExisted(database)) {
+      return databasePartitionTables.get(database).getLastDataAllotTable();
+    }
+    return Collections.emptyMap();
+  }
+
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 34211efdeb6..7ea774cc33c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -206,6 +206,9 @@ public class DeleteDatabaseProcedure
             LOG.info(
                 "[DeleteDatabaseProcedure] Database: {} is deleted 
successfully",
                 deleteDatabaseSchema.getName());
+            env.getConfigManager()
+                .getLoadManager()
+                .clearDataPartitionPolicyTable(deleteDatabaseSchema.getName());
             return Flow.NO_MORE_STATE;
           } else if (getCycles() > RETRY_THRESHOLD) {
             setFailure(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 78abfbca2dc..2dab4c451aa 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -52,6 +53,7 @@ public class CreateRegionGroupsProcedure
   private TConsensusGroupType consensusGroupType;
 
   private CreateRegionGroupsPlan createRegionGroupsPlan = new 
CreateRegionGroupsPlan();
+  private CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
 
   /** key: TConsensusGroupId value: Failed RegionReplicas */
   private Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets = 
new HashMap<>();
@@ -70,9 +72,11 @@ public class CreateRegionGroupsProcedure
   public CreateRegionGroupsProcedure(
       TConsensusGroupType consensusGroupType,
       CreateRegionGroupsPlan createRegionGroupsPlan,
+      CreateRegionGroupsPlan persistPlan,
       Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets) {
     this.consensusGroupType = consensusGroupType;
     this.createRegionGroupsPlan = createRegionGroupsPlan;
+    this.persistPlan = persistPlan;
     this.failedRegionReplicaSets = failedRegionReplicaSets;
   }
 
@@ -84,7 +88,7 @@ public class CreateRegionGroupsProcedure
         setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS);
         break;
       case SHUNT_REGION_REPLICAS:
-        CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+        persistPlan = new CreateRegionGroupsPlan();
         OfferRegionMaintainTasksPlan offerPlan = new 
OfferRegionMaintainTasksPlan();
         // Filter those RegionGroups that created successfully
         createRegionGroupsPlan
@@ -201,6 +205,17 @@ public class CreateRegionGroupsProcedure
         setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
         break;
       case CREATE_REGION_GROUPS_FINISH:
+        if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+          // Re-balance all corresponding DataPartitionPolicyTable
+          persistPlan
+              .getRegionGroupMap()
+              .keySet()
+              .forEach(
+                  database ->
+                      env.getConfigManager()
+                          .getLoadManager()
+                          .reBalanceDataPartitionPolicy(database));
+        }
         return Flow.NO_MORE_STATE;
     }
 
@@ -242,6 +257,7 @@ public class CreateRegionGroupsProcedure
           ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
           ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
         });
+    persistPlan.serializeForProcedure(stream);
   }
 
   @Override
@@ -259,6 +275,9 @@ public class CreateRegionGroupsProcedure
             ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
         failedRegionReplicaSets.put(groupId, replica);
       }
+      if (byteBuffer.hasRemaining()) {
+        persistPlan.deserializeForProcedure(byteBuffer);
+      }
     } catch (Exception e) {
       LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", 
e);
       throw new RuntimeException(e);
@@ -267,16 +286,22 @@ public class CreateRegionGroupsProcedure
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     CreateRegionGroupsProcedure that = (CreateRegionGroupsProcedure) o;
     return consensusGroupType == that.consensusGroupType
         && createRegionGroupsPlan.equals(that.createRegionGroupsPlan)
+        && persistPlan.equals(that.persistPlan)
         && failedRegionReplicaSets.equals(that.failedRegionReplicaSets);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(consensusGroupType, createRegionGroupsPlan, 
failedRegionReplicaSets);
+    return Objects.hash(
+        consensusGroupType, createRegionGroupsPlan, persistPlan, 
failedRegionReplicaSets);
   }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index e1a212c7f50..f30d46dd117 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.request;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -818,9 +819,12 @@ public class ConfigPhysicalPlanSerDeTest {
     CreateRegionGroupsPlan createRegionGroupsPlan = new 
CreateRegionGroupsPlan();
     createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
     createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
+    CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+    persistPlan.addRegionGroup("root.sg0", dataRegionSet);
+    persistPlan.addRegionGroup("root.sg1", schemaRegionSet);
     CreateRegionGroupsProcedure procedure0 =
         new CreateRegionGroupsProcedure(
-            TConsensusGroupType.DataRegion, createRegionGroupsPlan, 
failedRegions);
+            TConsensusGroupType.DataRegion, createRegionGroupsPlan, 
persistPlan, failedRegions);
 
     updateProcedurePlan0.setProcedure(procedure0);
     updateProcedurePlan1 =
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
new file mode 100644
index 00000000000..70ce9231c17
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataPartitionPolicyTableTest {
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+  @Test
+  public void testUpdateDataAllotTable() {
+    DataPartitionPolicyTable dataPartitionPolicyTable = new 
DataPartitionPolicyTable();
+    List<TConsensusGroupId> dataRegionGroups = new ArrayList<>();
+
+    // Test 1: construct DataAllotTable from scratch
+    TConsensusGroupId group1 = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
+    dataRegionGroups.add(group1);
+    dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      // All SeriesPartitionSlots belong to group1
+      Assert.assertEquals(
+          group1,
+          
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot));
+    }
+
+    // Test2: extend DataRegionGroups
+    Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new 
HashMap<>();
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
2));
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
3));
+    dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+    int mu = SERIES_SLOT_NUM / 3;
+    Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      TConsensusGroupId groupId =
+          
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+      lastDataAllotTable.put(seriesPartitionSlot, groupId);
+      counter.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
+    }
+    // All DataRegionGroups divide SeriesPartitionSlots evenly
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
counter.entrySet()) {
+      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+    }
+
+    // Test 3: extend DataRegionGroups while inherit as much 
SeriesPartitionSlots as possible
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
4));
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
5));
+    dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+    Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>();
+    mu = SERIES_SLOT_NUM / 5;
+    counter.clear();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      TConsensusGroupId groupId =
+          
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+      counter.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
+      if (groupId.getId() < 4) {
+        // Most of SeriesPartitionSlots in the first three DataRegionGroups 
should remain unchanged
+        Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), 
groupId);
+        unchangedSlots.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
+      }
+    }
+    // All DataRegionGroups divide SeriesPartitionSlots evenly
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
counter.entrySet()) {
+      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+    }
+    // Most of SeriesPartitionSlots in the first three DataRegionGroups should 
remain unchanged
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
unchangedSlots.entrySet()) {
+      Assert.assertEquals(mu, counterEntry.getValue().get());
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
index 0642bf6d6ab..9cd9d41301c 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
@@ -93,10 +93,13 @@ public class CreateRegionGroupsProcedureTest {
     CreateRegionGroupsPlan createRegionGroupsPlan = new 
CreateRegionGroupsPlan();
     createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
     createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
+    CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+    persistPlan.addRegionGroup("root.sg0", dataRegionSet);
+    persistPlan.addRegionGroup("root.sg1", schemaRegionSet);
 
     CreateRegionGroupsProcedure procedure0 =
         new CreateRegionGroupsProcedure(
-            TConsensusGroupType.DataRegion, createRegionGroupsPlan, 
failedRegions0);
+            TConsensusGroupType.DataRegion, createRegionGroupsPlan, 
persistPlan, failedRegions0);
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
 
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 2a6ead060ec..08cc77c3c35 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -128,13 +128,6 @@ cluster_name=defaultCluster
 # data_region_per_data_node=5.0
 
 
-# Whether to enable the DataPartition inherit policy.
-# DataPartition within the same SeriesPartitionSlot will inherit the 
allocation result of
-# the predecessor or successor TimePartitionSlot if set true
-# Datatype: Boolean
-# enable_data_partition_inherit_policy=true
-
-
 # The policy of cluster RegionGroups' leader distribution.
 # E.g. we should balance cluster RegionGroups' leader distribution when some 
DataNodes are shutdown or re-connected.
 # These policies are currently supported:
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 1a5611536ae..c0eac239d88 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -34,6 +34,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -99,22 +100,34 @@ public class DataPartitionTable {
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor or successor 
and returns if it
-   * does
+   * Checks whether the specified DataPartition has a successor and returns if 
it does.
+   *
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's successor if exists, null otherwise
+   */
+  public TConsensusGroupId getSuccessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
+    if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
+      return 
dataPartitionMap.get(seriesPartitionSlot).getSuccessorDataPartition(timePartitionSlot);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Checks whether the specified DataPartition has a predecessor and returns 
if it does.
    *
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
-      TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
+  public TConsensusGroupId getPredecessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
     if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
       return dataPartitionMap
           .get(seriesPartitionSlot)
-          .getAdjacentDataPartition(timePartitionSlot, timePartitionInterval);
+          .getPredecessorDataPartition(timePartitionSlot);
     } else {
       return null;
     }
@@ -236,6 +249,19 @@ public class DataPartitionTable {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Get the last DataAllotTable.
+   *
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
+    Map<TSeriesPartitionSlot, TConsensusGroupId> result = new HashMap<>();
+    dataPartitionMap.forEach(
+        (seriesPartitionSlot, seriesPartitionTable) ->
+            result.put(seriesPartitionSlot, 
seriesPartitionTable.getLastConsensusGroupId()));
+    return result;
+  }
+
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 47216cc41ce..1b87e42ad11 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.commons.partition;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -33,10 +34,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,22 +46,26 @@ import java.util.stream.Collectors;
 
 public class SeriesPartitionTable {
 
-  private final Map<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap;
+  private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap;
 
   public SeriesPartitionTable() {
-    this.seriesPartitionMap = new ConcurrentHashMap<>();
+    this.seriesPartitionMap = new TreeMap<>();
   }
 
   public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap) {
-    this.seriesPartitionMap = seriesPartitionMap;
+    this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
   }
 
   public Map<TTimePartitionSlot, List<TConsensusGroupId>> 
getSeriesPartitionMap() {
     return seriesPartitionMap;
   }
 
+  public void putDataPartition(TTimePartitionSlot timePartitionSlot, 
TConsensusGroupId groupId) {
+    seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new 
ArrayList<>()).add(groupId);
+  }
+
   /**
-   * Thread-safely get DataPartition within the specific StorageGroup
+   * Thread-safely get DataPartition within the specific Database.
    *
    * @param partitionSlotList TimePartitionSlotList
    * @param seriesPartitionTable Store the matched SeriesPartitions
@@ -119,34 +124,29 @@ public class SeriesPartitionTable {
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor or successor 
and returns if it
-   * does
+   * Check and return the specified DataPartition's successor.
    *
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
-   * @return The specific DataPartition's predecessor if exists, null otherwise
+   * @return The specified DataPartition's successor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
-      TTimePartitionSlot timePartitionSlot, long timePartitionInterval) {
-    if (timePartitionSlot.getStartTime() >= timePartitionInterval) {
-      // Check predecessor first
-      TTimePartitionSlot predecessorSlot =
-          new TTimePartitionSlot(timePartitionSlot.getStartTime() - 
timePartitionInterval);
-      TConsensusGroupId predecessor =
-          seriesPartitionMap.getOrDefault(predecessorSlot, 
Collections.singletonList(null)).get(0);
-      if (predecessor != null) {
-        return predecessor;
-      }
-    }
+  public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot 
timePartitionSlot) {
+    TTimePartitionSlot successorSlot = 
seriesPartitionMap.higherKey(timePartitionSlot);
+    return successorSlot == null ? null : 
seriesPartitionMap.get(successorSlot).get(0);
+  }
 
-    // Check successor
-    TTimePartitionSlot successorSlot =
-        new TTimePartitionSlot(timePartitionSlot.getStartTime() + 
timePartitionInterval);
-    return seriesPartitionMap.getOrDefault(successorSlot, 
Collections.singletonList(null)).get(0);
+  /**
+   * Check and return the specified DataPartition's predecessor.
+   *
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specified DataPartition's predecessor if exists, null 
otherwise
+   */
+  public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot 
timePartitionSlot) {
+    TTimePartitionSlot predecessorSlot = 
seriesPartitionMap.lowerKey(timePartitionSlot);
+    return predecessorSlot == null ? null : 
seriesPartitionMap.get(predecessorSlot).get(0);
   }
 
   /**
-   * Query a timePartition's corresponding dataRegionIds
+   * Query a timePartition's corresponding dataRegionIds.
    *
    * @param timeSlotId Time partition's timeSlotId
    * @return the timePartition's corresponding dataRegionIds
@@ -179,7 +179,7 @@ public class SeriesPartitionTable {
   }
 
   /**
-   * Create DataPartition within the specific SeriesPartitionSlot
+   * Create DataPartition within the specific SeriesPartitionSlot.
    *
    * @param assignedSeriesPartitionTable Assigned result
    * @param seriesPartitionSlot Corresponding TSeriesPartitionSlot
@@ -206,7 +206,7 @@ public class SeriesPartitionTable {
 
   /**
    * Only Leader use this interface. And this interface is synchronized. 
Thread-safely filter no
-   * assigned DataPartitionSlots within the specific SeriesPartitionSlot
+   * assigned DataPartitionSlots within the specific SeriesPartitionSlot.
    *
    * @param partitionSlots TimePartitionSlots
    * @return Unassigned PartitionSlots
@@ -225,6 +225,20 @@ public class SeriesPartitionTable {
     return result;
   }
 
+  /**
+   * Get the last DataPartition's ConsensusGroupId.
+   *
+   * @return The last DataPartition's ConsensusGroupId, null if there are no 
DataPartitions yet
+   */
+  public TConsensusGroupId getLastConsensusGroupId() {
+    Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+        seriesPartitionMap.lastEntry();
+    if (lastEntry == null) {
+      return null;
+    }
+    return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
+  }
+
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
@@ -238,7 +252,7 @@ public class SeriesPartitionTable {
     }
   }
 
-  /** Only for ConsensusRequest */
+  /** Only for ConsensusRequest. */
   public void deserialize(ByteBuffer buffer) {
     int timePartitionSlotNum = buffer.getInt();
     for (int i = 0; i < timePartitionSlotNum; i++) {
@@ -255,7 +269,7 @@ public class SeriesPartitionTable {
     }
   }
 
-  /** Only for Snapshot */
+  /** Only for Snapshot. */
   public void deserialize(InputStream inputStream, TProtocol protocol)
       throws IOException, TException {
     int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
new file mode 100644
index 00000000000..1fe5ce1bcdf
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.structure;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * This class is used to store key-value pairs. It supports the following 
operations: 1. Put a
+ * key-value pair. 2. Get key with minimum value.
+ *
+ * @param <K> The type of Key
+ * @param <V> The type of Value, should be Comparable
+ */
+public class BalanceTreeMap<K, V extends Comparable<V>> {
+
+  private final HashMap<K, V> keyValueMap;
+  private final TreeMap<V, Set<K>> valueKeysMap;
+
+  public BalanceTreeMap() {
+    this.keyValueMap = new HashMap<>();
+    this.valueKeysMap = new TreeMap<>();
+  }
+
+  /**
+   * Put or modify a key-value pair.
+   *
+   * @param key Key
+   * @param value Value
+   */
+  public void put(K key, V value) {
+    // Update keyValueMap
+    V oldValue = keyValueMap.put(key, value);
+
+    // Update valueKeyMap
+    if (oldValue != null) {
+      Set<K> keysSet = valueKeysMap.get(oldValue);
+      keysSet.remove(key);
+      if (keysSet.isEmpty()) {
+        valueKeysMap.remove(oldValue);
+      }
+    }
+    valueKeysMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key);
+  }
+
+  /**
+   * Get key with minimum value.
+   *
+   * @return Key with minimum value
+   */
+  public K getKeyWithMinValue() {
+    return valueKeysMap.firstEntry().getValue().iterator().next();
+  }
+
+  public V get(K key) {
+    return keyValueMap.getOrDefault(key, null);
+  }
+
+  public boolean containsKey(K key) {
+    return keyValueMap.containsKey(key);
+  }
+
+  public int size() {
+    return keyValueMap.size();
+  }
+
+  @TestOnly
+  public void remove(K key) {
+    V value = keyValueMap.remove(key);
+    if (value != null) {
+      Set<K> keysSet = valueKeysMap.get(value);
+      keysSet.remove(key);
+      if (keysSet.isEmpty()) {
+        valueKeysMap.remove(value);
+      }
+    }
+  }
+
+  @TestOnly
+  public boolean isEmpty() {
+    return keyValueMap.isEmpty() && valueKeysMap.isEmpty();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
new file mode 100644
index 00000000000..d38b8176f26
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.structure;
+
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class BalanceTreeMapTest {
+
+  @Test
+  public void testGetKeyWithMinValue() {
+    Random random = new Random();
+    BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new 
BalanceTreeMap<>();
+    for (int i = 0; i < 100; i++) {
+      balanceTreeMap.put(new TSeriesPartitionSlot(i), 
random.nextInt(Integer.MAX_VALUE));
+    }
+    TSeriesPartitionSlot minSlot = new TSeriesPartitionSlot(100);
+    balanceTreeMap.put(minSlot, Integer.MIN_VALUE);
+    for (int i = 101; i < 200; i++) {
+      balanceTreeMap.put(new TSeriesPartitionSlot(i), 
random.nextInt(Integer.MAX_VALUE));
+    }
+    Assert.assertEquals(minSlot, balanceTreeMap.getKeyWithMinValue());
+
+    int currentValue = Integer.MIN_VALUE;
+    for (int i = 0; i < 200; i++) {
+      TSeriesPartitionSlot slot = balanceTreeMap.getKeyWithMinValue();
+      Assert.assertTrue(balanceTreeMap.get(slot) >= currentValue);
+      currentValue = balanceTreeMap.get(slot);
+      balanceTreeMap.remove(slot);
+    }
+  }
+
+  @Test
+  public void testKeysDuplicate() {
+    BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new 
BalanceTreeMap<>();
+    Set<TSeriesPartitionSlot> duplicateSet0 = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+      balanceTreeMap.put(slot, 0);
+      duplicateSet0.add(slot);
+    }
+    Set<TSeriesPartitionSlot> duplicateSet1 = new HashSet<>();
+    for (int i = 10; i < 20; i++) {
+      TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+      balanceTreeMap.put(slot, 1);
+      duplicateSet1.add(slot);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      
Assert.assertTrue(duplicateSet0.contains(balanceTreeMap.getKeyWithMinValue()));
+      balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+    }
+    for (int i = 0; i < 10; i++) {
+      
Assert.assertTrue(duplicateSet1.contains(balanceTreeMap.getKeyWithMinValue()));
+      balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+    }
+    Assert.assertTrue(balanceTreeMap.isEmpty());
+  }
+}

Reply via email to