This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch Computing-resource-balancing_cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit deecdbd9292d199c0917c94928923dbda3c4e36a
Author: YongzaoDan <[email protected]>
AuthorDate: Sat Jul 22 19:04:06 2023 +0800

    Pass IT
---
 .../iotdb/it/env/cluster/MppCommonConfig.java      |  14 +-
 .../it/env/cluster/MppSharedCommonConfig.java      |  15 +-
 .../iotdb/it/env/remote/RemoteCommonConfig.java    |  12 +-
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   4 +-
 .../partition/IoTDBPartitionInheritPolicyIT.java   | 283 ++++++++++++++-------
 .../confignode/it/utils/ConfigNodeTestUtils.java   | 117 +++++++++
 .../db/it/last/IoTDBLastQueryLastCacheIT.java      |   3 +-
 .../manager/load/balancer/PartitionBalancer.java   |  22 +-
 .../load/balancer/partition/DataAllotTable.java    |  68 ++---
 .../manager/partition/PartitionManager.java        |   7 +-
 .../partition/DatabasePartitionTable.java          |   8 +-
 .../persistence/partition/PartitionInfo.java       |   7 +-
 .../balancer/partition/DataAllotTableTest.java     |  43 +++-
 .../commons/partition/DataPartitionEntry.java      |  87 +++++++
 .../commons/partition/DataPartitionTable.java      |  10 +-
 .../commons/partition/SeriesPartitionTable.java    |  12 +-
 .../commons/partition/DataPartitionEntryTest.java  |  58 +++++
 17 files changed, 579 insertions(+), 191 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index 6c01de2e787..4dbc3453fb8 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -199,14 +199,6 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableDataPartitionInheritPolicy(
-      boolean enableDataPartitionInheritPolicy) {
-    setProperty(
-        "enable_data_partition_inherit_policy", 
String.valueOf(enableDataPartitionInheritPolicy));
-    return this;
-  }
-
   @Override
   public CommonConfig setDataReplicationFactor(int dataReplicationFactor) {
     setProperty("data_replication_factor", 
String.valueOf(dataReplicationFactor));
@@ -359,4 +351,10 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     setProperty("database_limit_threshold", 
String.valueOf(databaseLimitThreshold));
     return this;
   }
+
+  @Override
+  public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+    setProperty("data_region_per_data_node", 
String.valueOf(dataRegionPerDataNode));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 7af4f01fa63..b362a02d97b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -172,14 +172,6 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableDataPartitionInheritPolicy(
-      boolean enableDataPartitionInheritPolicy) {
-    
cnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy);
-    
dnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy);
-    return this;
-  }
-
   @Override
   public CommonConfig setSchemaRegionGroupExtensionPolicy(String 
schemaRegionGroupExtensionPolicy) {
     
cnConfig.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy);
@@ -372,4 +364,11 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     cnConfig.setDatabaseLimitThreshold(databaseLimitThreshold);
     return this;
   }
+
+  @Override
+  public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+    dnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
+    cnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index a578a6f4cf5..e181d5e1f73 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.it.env.remote;
 
 import org.apache.iotdb.itbase.env.CommonConfig;
@@ -122,12 +123,6 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableDataPartitionInheritPolicy(
-      boolean enableDataPartitionInheritPolicy) {
-    return this;
-  }
-
   @Override
   public CommonConfig setSchemaRegionGroupExtensionPolicy(String 
schemaRegionGroupExtensionPolicy) {
     return this;
@@ -266,4 +261,9 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setDatabaseLimitThreshold(long databaseLimitThreshold) {
     return this;
   }
+
+  @Override
+  public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 2a08304221d..08161c36bc3 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -62,8 +62,6 @@ public interface CommonConfig {
 
   CommonConfig setDataRegionConsensusProtocolClass(String 
dataRegionConsensusProtocolClass);
 
-  CommonConfig setEnableDataPartitionInheritPolicy(boolean 
enableDataPartitionInheritPolicy);
-
   CommonConfig setSchemaRegionGroupExtensionPolicy(String 
schemaRegionGroupExtensionPolicy);
 
   CommonConfig setDefaultSchemaRegionGroupNumPerDatabase(int 
schemaRegionGroupPerDatabase);
@@ -119,4 +117,6 @@ public interface CommonConfig {
   CommonConfig setSortBufferSize(long sortBufferSize);
 
   CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte);
+
+  CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index 8b49948f5f7..aecd22375f5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.it.partition;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -40,27 +39,26 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
 public class IoTDBPartitionInheritPolicyIT {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBPartitionInheritPolicyIT.class);
-  private static final boolean testEnableDataPartitionInheritPolicy = true;
   private static final String testDataRegionConsensusProtocolClass =
       ConsensusFactory.RATIS_CONSENSUS;
-  private static final int testReplicationFactor = 3;
+  private static final int testReplicationFactor = 1;
+  private static final int testSeriesSlotNum = 1000;
   private static final long testTimePartitionInterval = 604800000;
+  private static final double testDataRegionPerDataNode = 5.0;
 
-  private static final String sg = "root.sg";
-  private static final int storageGroupNum = 2;
-  private static final int testSeriesPartitionSlotNum = 100;
-  private static final int seriesPartitionBatchSize = 10;
+  private static final String database = "root.database";
+  private static final int seriesPartitionSlotBatchSize = 100;
   private static final int testTimePartitionSlotsNum = 100;
   private static final int timePartitionBatchSize = 10;
 
@@ -70,21 +68,19 @@ public class IoTDBPartitionInheritPolicyIT {
         .getConfig()
         .getCommonConfig()
         
.setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass)
-        
.setEnableDataPartitionInheritPolicy(testEnableDataPartitionInheritPolicy)
         .setDataReplicationFactor(testReplicationFactor)
         .setTimePartitionInterval(testTimePartitionInterval)
-        .setSeriesSlotNum(testSeriesPartitionSlotNum * 10);
+        .setSeriesSlotNum(testSeriesSlotNum)
+        .setDataRegionPerDataNode(testDataRegionPerDataNode);
 
-    // Init 1C3D environment
-    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+    // Init 1C1D environment
+    EnvFactory.getEnv().initClusterEnvironment(1, 1);
 
-    // Set StorageGroups
+    // Set Database
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-      for (int i = 0; i < storageGroupNum; i++) {
-        TSStatus status = client.setDatabase(new TDatabaseSchema(sg + i));
-        Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-      }
+      TSStatus status = client.setDatabase(new TDatabaseSchema(database));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
     }
   }
 
@@ -95,76 +91,185 @@ public class IoTDBPartitionInheritPolicyIT {
 
   @Test
   public void testDataPartitionInheritPolicy() throws Exception {
+    final long baseStartTime = 1000;
+    Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new 
ConcurrentHashMap<>();
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-      TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
-      TDataPartitionTableResp dataPartitionTableResp;
-      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap;
-
-      for (int i = 0; i < storageGroupNum; i++) {
-        String storageGroup = sg + i;
-        for (int j = 0; j < testSeriesPartitionSlotNum; j += 
seriesPartitionBatchSize) {
-          // Test inherit predecessor or successor
-          boolean isAscending = (j / 10) % 2 == 0;
-          int step = isAscending ? timePartitionBatchSize : 
-timePartitionBatchSize;
-          int k = isAscending ? 0 : testTimePartitionSlotsNum - 
timePartitionBatchSize;
-          while (0 <= k && k < testTimePartitionSlotsNum) {
-            partitionSlotsMap =
-                ConfigNodeTestUtils.constructPartitionSlotsMap(
-                    storageGroup,
-                    j,
-                    j + seriesPartitionBatchSize,
-                    k,
-                    k + timePartitionBatchSize,
-                    testTimePartitionInterval);
-            // Let ConfigNode create DataPartition
-            dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
-            for (int retry = 0; retry < 5; retry++) {
-              // Build new Client since it's unstable
-              try (SyncConfigNodeIServiceClient configNodeClient =
-                  (SyncConfigNodeIServiceClient)
-                      EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-                dataPartitionTableResp =
-                    
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
-                if (dataPartitionTableResp != null
-                    && dataPartitionTableResp.getStatus().getCode()
-                        == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                  ConfigNodeTestUtils.checkDataPartitionTable(
-                      storageGroup,
-                      j,
-                      j + seriesPartitionBatchSize,
-                      k,
-                      k + timePartitionBatchSize,
-                      testTimePartitionInterval,
-                      configNodeClient
-                          .getDataPartitionTable(dataPartitionReq)
-                          .getDataPartitionTable());
-                  break;
-                }
-              } catch (Exception e) {
-                // Retry sometimes in order to avoid request timeout
-                LOGGER.error(e.getMessage());
-                TimeUnit.SECONDS.sleep(1);
+    // Test1: divide and inherit DataPartitions from scratch
+    // Notice: create all DataRegionGroups as soon as possible
+    // Otherwise, the allocation might be slightly unbalanced
+    ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+        database, 0, 10, baseStartTime, baseStartTime + 1, 
testTimePartitionInterval);
+    ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+        database,
+        10,
+        testSeriesSlotNum,
+        baseStartTime,
+        baseStartTime + 1,
+        testTimePartitionInterval);
+
+    for (long timePartitionSlot = baseStartTime + 1;
+        timePartitionSlot < baseStartTime + testTimePartitionSlotsNum;
+        timePartitionSlot++) {
+      for (int seriesPartitionSlot = 0;
+          seriesPartitionSlot < testSeriesSlotNum;
+          seriesPartitionSlot += seriesPartitionSlotBatchSize) {
+        ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+            database,
+            seriesPartitionSlot,
+            seriesPartitionSlot + seriesPartitionSlotBatchSize,
+            timePartitionSlot,
+            timePartitionSlot + 1,
+            testTimePartitionInterval);
+      }
+    }
+
+    int mu = (int) (testSeriesSlotNum / testDataRegionPerDataNode);
+    TDataPartitionTableResp dataPartitionTableResp =
+        ConfigNodeTestUtils.getDataPartitionWithRetry(
+            database,
+            0,
+            testSeriesSlotNum,
+            baseStartTime,
+            baseStartTime + testTimePartitionSlotsNum,
+            testTimePartitionInterval);
+    Assert.assertNotNull(dataPartitionTableResp);
+
+    // All DataRegionGroups divide all SeriesSlots evenly
+    final int expectedPartitionNum1 = mu * testTimePartitionSlotsNum;
+    Map<TConsensusGroupId, Integer> counter =
+        ConfigNodeTestUtils.countDataPartition(
+            dataPartitionTableResp.getDataPartitionTable().get(database));
+    counter.forEach((groupId, num) -> 
Assert.assertEquals(expectedPartitionNum1, num.intValue()));
+
+    // Test DataPartition inherit policy
+    dataPartitionTableResp
+        .getDataPartitionTable()
+        .get(database)
+        .forEach(
+            ((seriesPartitionSlot, timePartitionSlotMap) -> {
+              // All Timeslots belonging to the same SeriesSlot are allocated 
to the same
+              // DataRegionGroup
+              TConsensusGroupId groupId =
+                  timePartitionSlotMap
+                      .get(new TTimePartitionSlot(baseStartTime * 
testTimePartitionInterval))
+                      .get(0);
+              timePartitionSlotMap.forEach(
+                  (timePartitionSlot, groupIdList) ->
+                      Assert.assertEquals(groupId, groupIdList.get(0)));
+              dataAllotTable1.put(seriesPartitionSlot, groupId);
+            }));
+
+    // Register a new DataNode to extend DataRegionGroups
+    EnvFactory.getEnv().registerNewDataNode(true);
+
+    // Test2: divide and inherit DataPartitions after extension
+    mu = (int) (testSeriesSlotNum / (testDataRegionPerDataNode * 2));
+    dataPartitionTableResp =
+        ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+            database,
+            0,
+            testSeriesSlotNum,
+            baseStartTime + testTimePartitionSlotsNum,
+            baseStartTime + testTimePartitionSlotsNum + timePartitionBatchSize,
+            testTimePartitionInterval);
+    Assert.assertNotNull(dataPartitionTableResp);
+
+    // All DataRegionGroups divide all SeriesSlots evenly
+    counter =
+        ConfigNodeTestUtils.countDataPartition(
+            dataPartitionTableResp.getDataPartitionTable().get(database));
+    final int expectedPartitionNum2 = mu * timePartitionBatchSize;
+    counter.forEach((groupId, num) -> 
Assert.assertEquals(expectedPartitionNum2, num.intValue()));
+
+    // Test DataPartition inherit policy
+    Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable2 = new 
ConcurrentHashMap<>();
+    dataPartitionTableResp
+        .getDataPartitionTable()
+        .get(database)
+        .forEach(
+            ((seriesPartitionSlot, timePartitionSlotMap) -> {
+              // All Timeslots belonging to the same SeriesSlot are allocated 
to the same
+              // DataRegionGroup
+              TConsensusGroupId groupId =
+                  timePartitionSlotMap
+                      .get(
+                          new TTimePartitionSlot(
+                              (baseStartTime + testTimePartitionSlotsNum)
+                                  * testTimePartitionInterval))
+                      .get(0);
+              timePartitionSlotMap.forEach(
+                  (timePartitionSlot, groupIdList) ->
+                      Assert.assertEquals(groupId, groupIdList.get(0)));
+
+              if (dataAllotTable1.containsValue(groupId)) {
+                // The DataRegionGroup has been inherited
+                
Assert.assertTrue(dataAllotTable1.containsKey(seriesPartitionSlot));
+                Assert.assertEquals(dataAllotTable1.get(seriesPartitionSlot), 
groupId);
               }
-            }
-            k += step;
-          }
-        }
+              dataAllotTable2.put(seriesPartitionSlot, groupId);
+            }));
+
+    // Test3: historical DataPartitions will inherit successor
+    Random random = new Random();
+    Set<Integer> allocatedSlots = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      int slot = random.nextInt(testSeriesSlotNum);
+      while (allocatedSlots.contains(slot)) {
+        slot = random.nextInt(testSeriesSlotNum);
+      }
+      allocatedSlots.add(slot);
+      dataPartitionTableResp =
+          ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+              database,
+              slot,
+              slot + 1,
+              baseStartTime - 1,
+              baseStartTime,
+              testTimePartitionInterval);
+      Assert.assertNotNull(dataPartitionTableResp);
+
+      TSeriesPartitionSlot seriesPartitionSlot = new 
TSeriesPartitionSlot(slot);
+      TTimePartitionSlot timePartitionSlot =
+          new TTimePartitionSlot((baseStartTime - 1) * 
testTimePartitionInterval);
+      Assert.assertEquals(
+          dataAllotTable1.get(seriesPartitionSlot),
+          dataPartitionTableResp
+              .getDataPartitionTable()
+              .get(database)
+              .get(seriesPartitionSlot)
+              .get(timePartitionSlot)
+              .get(0));
+    }
+
+    // Test4: future DataPartitions will inherit predecessor
+    allocatedSlots.clear();
+    for (int i = 0; i < 10; i++) {
+      int slot = random.nextInt(testSeriesSlotNum);
+      while (allocatedSlots.contains(slot)) {
+        slot = random.nextInt(testSeriesSlotNum);
       }
+      allocatedSlots.add(slot);
+      dataPartitionTableResp =
+          ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
+              database,
+              slot,
+              slot + 1,
+              baseStartTime + 999,
+              baseStartTime + 1000,
+              testTimePartitionInterval);
+      Assert.assertNotNull(dataPartitionTableResp);
 
-      // Test DataPartition inherit policy
-      TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
-      showRegionResp
-          .getRegionInfoList()
-          .forEach(
-              regionInfo -> {
-                // All Timeslots belonging to the same SeriesSlot are 
allocated to the same
-                // DataRegionGroup
-                Assert.assertEquals(
-                    regionInfo.getSeriesSlots() * testTimePartitionSlotsNum,
-                    regionInfo.getTimeSlots());
-              });
+      TSeriesPartitionSlot seriesPartitionSlot = new 
TSeriesPartitionSlot(slot);
+      TTimePartitionSlot timePartitionSlot =
+          new TTimePartitionSlot((baseStartTime + 999) * 
testTimePartitionInterval);
+      Assert.assertEquals(
+          dataAllotTable2.get(seriesPartitionSlot),
+          dataPartitionTableResp
+              .getDataPartitionTable()
+              .get(database)
+              .get(seriesPartitionSlot)
+              .get(timePartitionSlot)
+              .get(0));
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 6f20a25ba79..8f5b46288fb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
@@ -34,12 +35,18 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
 import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -47,11 +54,17 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertTrue;
 
 public class ConfigNodeTestUtils {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigNodeTestUtils.class);
+
   public static void checkNodeConfig(
       List<TConfigNodeLocation> configNodeList,
       List<TDataNodeLocation> dataNodeList,
@@ -169,6 +182,110 @@ public class ConfigNodeTestUtils {
     }
   }
 
+  public static TDataPartitionTableResp getDataPartitionWithRetry(
+      String database,
+      int seriesSlotStart,
+      int seriesSlotEnd,
+      long timeSlotStart,
+      long timeSlotEnd,
+      long timePartitionInterval)
+      throws InterruptedException {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+        ConfigNodeTestUtils.constructPartitionSlotsMap(
+            database,
+            seriesSlotStart,
+            seriesSlotEnd,
+            timeSlotStart,
+            timeSlotEnd,
+            timePartitionInterval);
+    TDataPartitionTableResp dataPartitionTableResp;
+    TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+
+    for (int retry = 0; retry < 5; retry++) {
+      // Build new Client since it's unstable
+      try (SyncConfigNodeIServiceClient configNodeClient =
+          (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        dataPartitionTableResp = 
configNodeClient.getDataPartitionTable(dataPartitionReq);
+        if (dataPartitionTableResp != null
+            && dataPartitionTableResp.getStatus().getCode()
+                == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return dataPartitionTableResp;
+        }
+      } catch (Exception e) {
+        // Retry sometimes in order to avoid request timeout
+        LOGGER.error(e.getMessage());
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+    Assert.fail("Failed to create DataPartition");
+    return null;
+  }
+
+  public static TDataPartitionTableResp getOrCreateDataPartitionWithRetry(
+      String database,
+      int seriesSlotStart,
+      int seriesSlotEnd,
+      long timeSlotStart,
+      long timeSlotEnd,
+      long timePartitionInterval)
+      throws InterruptedException {
+
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+        ConfigNodeTestUtils.constructPartitionSlotsMap(
+            database,
+            seriesSlotStart,
+            seriesSlotEnd,
+            timeSlotStart,
+            timeSlotEnd,
+            timePartitionInterval);
+    TDataPartitionTableResp dataPartitionTableResp;
+    TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+
+    for (int retry = 0; retry < 5; retry++) {
+      // Build new Client since it's unstable
+      try (SyncConfigNodeIServiceClient configNodeClient =
+          (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        dataPartitionTableResp = 
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+        if (dataPartitionTableResp != null
+            && dataPartitionTableResp.getStatus().getCode()
+                == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          ConfigNodeTestUtils.checkDataPartitionTable(
+              database,
+              seriesSlotStart,
+              seriesSlotEnd,
+              timeSlotStart,
+              timeSlotEnd,
+              timePartitionInterval,
+              
configNodeClient.getDataPartitionTable(dataPartitionReq).getDataPartitionTable());
+          return dataPartitionTableResp;
+        }
+      } catch (Exception e) {
+        // Retry sometimes in order to avoid request timeout
+        LOGGER.error(e.getMessage());
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+    Assert.fail("Failed to create DataPartition");
+    return null;
+  }
+
+  public static Map<TConsensusGroupId, Integer> countDataPartition(
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>
+          dataPartitionMap) {
+    Map<TConsensusGroupId, AtomicInteger> counter = new ConcurrentHashMap<>();
+    dataPartitionMap.forEach(
+        ((seriesPartitionSlot, timePartitionSlotMap) ->
+            timePartitionSlotMap.forEach(
+                ((timePartitionSlot, consensusGroupIds) ->
+                    consensusGroupIds.forEach(
+                        (consensusGroupId ->
+                            counter
+                                .computeIfAbsent(consensusGroupId, empty -> 
new AtomicInteger(0))
+                                .incrementAndGet()))))));
+    return counter.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().get()));
+  }
+
   public static TConfigNodeLocation generateTConfigNodeLocation(
       int nodeId, ConfigNodeWrapper configNodeWrapper) {
     return new TConfigNodeLocation(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
index 33f9ef9e151..6d30db00085 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
@@ -119,7 +119,8 @@ public class IoTDBLastQueryLastCacheIT {
           "1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,",
           "1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
         };
-    resultSetEqualTest("select last * from root.ln_1.tb_6141;", 
expectedHeader, retArray);
+    resultSetEqualTest(
+        "select last * from root.ln_1.tb_6141 order by timeseries;", 
expectedHeader, retArray);
   }
 
   @Test
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index c0c617cd795..efd36a65382 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
@@ -42,6 +43,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -242,28 +244,22 @@ public class PartitionBalancer {
    * @param database Database name
    */
   public void updateDataAllotTable(String database) {
-    TTimePartitionSlot currentTimePartition =
-        dataAllotTableMap
-            .computeIfAbsent(database, empty -> new DataAllotTable())
-            .getCurrentTimePartition();
-    Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new 
ConcurrentHashMap<>();
+    List<DataPartitionEntry> lastDataPartitions = new ArrayList<>();
     for (int i = 0; i < SERIES_SLOT_NUM; i++) {
       TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
-      Pair<TTimePartitionSlot, TConsensusGroupId> lastDataPartition =
-          getPartitionManager().getLastDataPartition(database, 
seriesPartitionSlot);
-      if (lastDataPartition != null
-          && currentTimePartition.compareTo(lastDataPartition.getLeft()) < 0) {
-        // Put all future DataPartitions into the allocatedTable
-        allocatedTable.put(seriesPartitionSlot, lastDataPartition.getRight());
+      DataPartitionEntry lastDataPartition =
+          getPartitionManager().getLastDataPartitionEntry(database, 
seriesPartitionSlot);
+      if (lastDataPartition != null) {
+        lastDataPartitions.add(lastDataPartition);
       }
     }
 
     try {
       dataAllotTableMap
-          .get(database)
+          .computeIfAbsent(database, empty -> new DataAllotTable())
           .updateDataAllotTable(
               getPartitionManager().getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion),
-              allocatedTable);
+              lastDataPartitions);
     } catch (DatabaseNotExistsException e) {
       LOGGER.error("Database {} not exists when updateDataAllotTable", 
database);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
index dec450d20d9..02797d45d8d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
@@ -22,15 +22,16 @@ package 
org.apache.iotdb.confignode.manager.load.balancer.partition;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.structure.BalanceTreeMap;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -72,50 +73,51 @@ public class DataAllotTable {
    * Update the DataAllotTable according to the current DataRegionGroups and 
future DataAllotTable.
    *
    * @param dataRegionGroups the current DataRegionGroups
-   * @param allocatedTable the future DataAllotTable, i.e. some SeriesSlots 
have already allocated
+   * @param lastDataPartitions the last DataPartition of each 
SeriesPartitionSlot
    */
   public void updateDataAllotTable(
-      List<TConsensusGroupId> dataRegionGroups,
-      Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable) {
+      List<TConsensusGroupId> dataRegionGroups, List<DataPartitionEntry> 
lastDataPartitions) {
     dataAllotTableLock.writeLock().lock();
     try {
       // mu is the average number of slots allocated to each regionGroup
       int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
-      // Decide all SeriesSlot randomly
-      List<TSeriesPartitionSlot> seriesSlotList = new ArrayList<>();
-      for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-        seriesSlotList.add(new TSeriesPartitionSlot(i));
-      }
-      Collections.shuffle(seriesSlotList);
 
       // The counter will maintain the number of slots allocated to each 
regionGroup
       BalanceTreeMap<TConsensusGroupId, Integer> counter = new 
BalanceTreeMap<>();
-      Map<TConsensusGroupId, AtomicInteger> regionSlotCounter = new 
HashMap<>();
-      allocatedTable.forEach(
-          (seriesSlot, regionGroupId) ->
-              regionSlotCounter
-                  .computeIfAbsent(regionGroupId, empty -> new 
AtomicInteger(0))
-                  .incrementAndGet());
-      dataRegionGroups.forEach(
-          regionGroupId -> regionSlotCounter.putIfAbsent(regionGroupId, new 
AtomicInteger(0)));
-      regionSlotCounter.forEach(
-          (regionGroupId, slotNum) -> counter.put(regionGroupId, 
slotNum.get()));
+      dataRegionGroups.forEach(regionGroupId -> counter.put(regionGroupId, 0));
 
-      Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new 
HashMap<>();
-      for (TSeriesPartitionSlot seriesPartitionSlot : seriesSlotList) {
-        if (allocatedTable.containsKey(seriesPartitionSlot)) {
-          // If the SeriesSlot has already been allocated, keep the allocation
-          newAllotTable.put(seriesPartitionSlot, 
allocatedTable.get(seriesPartitionSlot));
-          continue;
+      // Fill unallocated SeriesSlots
+      Set<TSeriesPartitionSlot> allocatedSeriesSlots =
+          lastDataPartitions.stream()
+              .map(DataPartitionEntry::getSeriesPartitionSlot)
+              .collect(Collectors.toSet());
+      for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+        TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+        if (!allocatedSeriesSlots.contains(seriesPartitionSlot)) {
+          lastDataPartitions.add(
+              new DataPartitionEntry(
+                  seriesPartitionSlot, new TTimePartitionSlot(Long.MIN_VALUE), 
null));
         }
+      }
 
-        TConsensusGroupId oldRegionGroupId = 
dataAllotMap.get(seriesPartitionSlot);
-        if (oldRegionGroupId != null
-            && counter.containsKey(oldRegionGroupId)
-            && counter.get(oldRegionGroupId) < mu) {
-          // Inherit the oldRegionGroupId when the slotNum of oldRegionGroupId 
is less than average
-          newAllotTable.put(seriesPartitionSlot, oldRegionGroupId);
-          counter.put(oldRegionGroupId, counter.get(oldRegionGroupId) + 1);
+      // The allocated DataPartitions are sorted as follows:
+      // 1. Descending order of TimePartitionSlot
+      // 2. Ascending order of random weight
+      Collections.sort(lastDataPartitions);
+
+      Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new 
HashMap<>();
+      // Enumerate all SeriesPartitionSlots in descending order of their 
TimePartitionSlot
+      for (DataPartitionEntry entry : lastDataPartitions) {
+        TSeriesPartitionSlot seriesPartitionSlot = 
entry.getSeriesPartitionSlot();
+        TConsensusGroupId allocatedRegionGroupId = entry.getDataRegionGroup();
+        if (allocatedRegionGroupId != null
+            // Inherit DataRegionGroup if it has been allocated in the future
+            && (entry.getTimePartitionSlot().getStartTime()
+                    > currentTimePartition.get().getStartTime()
+                // Inherit DataRegionGroup when the slotNum of 
oldRegionGroupId is less than average
+                || counter.get(allocatedRegionGroupId) < mu)) {
+          newAllotTable.put(seriesPartitionSlot, allocatedRegionGroupId);
+          counter.put(allocatedRegionGroupId, 
counter.get(allocatedRegionGroupId) + 1);
           continue;
         }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index a671150a308..65429f70ec4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
@@ -1311,12 +1312,12 @@ public class PartitionManager {
    *
    * @param database The specified Database
    * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartition, null if the Database doesn't exist or 
there are no
+   * @return The last DataPartitionEntry, null if the Database doesn't exist 
or there are no
    *     DataPartitions yet
    */
-  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
+  public DataPartitionEntry getLastDataPartitionEntry(
       String database, TSeriesPartitionSlot seriesPartitionSlot) {
-    return partitionInfo.getLastDataPartition(database, seriesPartitionSlot);
+    return partitionInfo.getLastDataPartitionEntry(database, 
seriesPartitionSlot);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 9782356b595..a27168d8f8f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import 
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
@@ -585,12 +586,11 @@ public class DatabasePartitionTable {
    * Get the DataPartition with max TimePartition of the specified the 
SeriesPartitionSlot.
    *
    * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartition, null if there are no DataPartitions in 
the specified
+   * @return The last DataPartitionEntry, null if there are no DataPartitions 
in the specified
    *     SeriesPartitionSlot
    */
-  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
-      TSeriesPartitionSlot seriesPartitionSlot) {
-    return dataPartitionTable.getLastDataPartition(seriesPartitionSlot);
+  public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot 
seriesPartitionSlot) {
+    return dataPartitionTable.getLastDataPartitionEntry(seriesPartitionSlot);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index a0e75d29168..bfd509345eb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
@@ -835,13 +836,13 @@ public class PartitionInfo implements SnapshotProcessor {
    *
    * @param database The specified Database
    * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartition, null if the Database doesn't exist or 
there are no
+   * @return The last DataPartitionEntry, null if the Database doesn't exist 
or there are no
    *     DataPartitions in the specified SeriesPartitionSlot
    */
-  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
+  public DataPartitionEntry getLastDataPartitionEntry(
       String database, TSeriesPartitionSlot seriesPartitionSlot) {
     if (isDatabaseExisted(database)) {
-      return 
databasePartitionTables.get(database).getLastDataPartition(seriesPartitionSlot);
+      return 
databasePartitionTables.get(database).getLastDataPartitionEntry(seriesPartitionSlot);
     }
     return null;
   }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
index 45ee77cecb1..67ff0dc0099 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 
@@ -31,9 +32,11 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class DataAllotTableTest {
@@ -106,7 +109,7 @@ public class DataAllotTableTest {
     // Test 1: construct DataAllotTable from scratch
     TConsensusGroupId group1 = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
     dataRegionGroups.add(group1);
-    dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>());
+    dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>());
     for (int i = 0; i < SERIES_SLOT_NUM; i++) {
       TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
       // All SeriesPartitionSlots belong to group1
@@ -117,7 +120,7 @@ public class DataAllotTableTest {
     Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new 
HashMap<>();
     dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
2));
     dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
3));
-    dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>());
+    dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>());
     int mu = SERIES_SLOT_NUM / 3;
     Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>();
     for (int i = 0; i < SERIES_SLOT_NUM; i++) {
@@ -135,19 +138,35 @@ public class DataAllotTableTest {
     dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
4));
     dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
5));
     Random random = new Random();
-    Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new 
HashMap<>();
+    Set<TSeriesPartitionSlot> selectedSlots = new HashSet<>();
+    List<DataPartitionEntry> lastDataPartitions = new ArrayList<>();
     Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>();
     for (int i = 0; i < 50; i++) {
+      // Randomly pre-allocate 50 SeriesPartitionSlots
       TSeriesPartitionSlot seriesPartitionSlot =
           new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
-      while (allocatedTable.containsKey(seriesPartitionSlot)) {
+      while (selectedSlots.contains(seriesPartitionSlot)) {
         seriesPartitionSlot = new 
TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
       }
-      allocatedTable.put(
-          seriesPartitionSlot,
-          new TConsensusGroupId(TConsensusGroupType.DataRegion, 
random.nextInt(2) + 4));
+      selectedSlots.add(seriesPartitionSlot);
+      lastDataPartitions.add(
+          new DataPartitionEntry(
+              seriesPartitionSlot,
+              new TTimePartitionSlot(Long.MAX_VALUE),
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, 
random.nextInt(2) + 4)));
     }
-    dataAllotTable.updateDataAllotTable(dataRegionGroups, allocatedTable);
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      // Record the other allocation result
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      if (!selectedSlots.contains(seriesPartitionSlot)) {
+        lastDataPartitions.add(
+            new DataPartitionEntry(
+                seriesPartitionSlot,
+                new TTimePartitionSlot(Long.MIN_VALUE),
+                lastDataAllotTable.get(seriesPartitionSlot)));
+      }
+    }
+    dataAllotTable.updateDataAllotTable(dataRegionGroups, lastDataPartitions);
     mu = SERIES_SLOT_NUM / 5;
     counter.clear();
     for (int i = 0; i < SERIES_SLOT_NUM; i++) {
@@ -166,9 +185,11 @@ public class DataAllotTableTest {
     }
     // All SeriesPartitionSlots that have been allocated before should be 
allocated to the same
     // DataRegionGroup
-    allocatedTable.forEach(
-        (seriesPartitionSlot, groupId) ->
-            Assert.assertEquals(groupId, 
dataAllotTable.getRegionGroupId(seriesPartitionSlot)));
+    for (int i = 0; i < 50; i++) {
+      Assert.assertEquals(
+          lastDataPartitions.get(i).getDataRegionGroup(),
+          
dataAllotTable.getRegionGroupId(lastDataPartitions.get(i).getSeriesPartitionSlot()));
+    }
     // Most of SeriesPartitionSlots in the first three DataRegionGroups should 
remain unchanged
     for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
unchangedSlots.entrySet()) {
       Assert.assertEquals(mu, counterEntry.getValue().get());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java
new file mode 100644
index 00000000000..449f8e45163
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+
+import java.util.Objects;
+import java.util.Random;
+
+public class DataPartitionEntry implements Comparable<DataPartitionEntry> {
+
+  private final TSeriesPartitionSlot seriesPartitionSlot;
+  private final TTimePartitionSlot timePartitionSlot;
+  private final TConsensusGroupId dataRegionGroup;
+  private final int weight;
+
+  public DataPartitionEntry(
+      TSeriesPartitionSlot seriesPartitionSlot,
+      TTimePartitionSlot timePartitionSlot,
+      TConsensusGroupId dataRegionGroup) {
+    this.seriesPartitionSlot = seriesPartitionSlot;
+    this.timePartitionSlot = timePartitionSlot;
+    this.dataRegionGroup = dataRegionGroup;
+    this.weight = new Random().nextInt();
+  }
+
+  public TSeriesPartitionSlot getSeriesPartitionSlot() {
+    return seriesPartitionSlot;
+  }
+
+  public TTimePartitionSlot getTimePartitionSlot() {
+    return timePartitionSlot;
+  }
+
+  public TConsensusGroupId getDataRegionGroup() {
+    return dataRegionGroup;
+  }
+
+  @Override
+  public int compareTo(DataPartitionEntry o) {
+    // The timePartitionSlot will be in descending order
+    // After invoke Collections.sort()
+    if (!timePartitionSlot.equals(o.timePartitionSlot)) {
+      return -timePartitionSlot.compareTo(o.timePartitionSlot);
+    }
+    return Integer.compare(weight, o.weight);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DataPartitionEntry that = (DataPartitionEntry) o;
+    return weight == that.weight
+        && seriesPartitionSlot.equals(that.seriesPartitionSlot)
+        && timePartitionSlot.equals(that.timePartitionSlot)
+        && dataRegionGroup.equals(that.dataRegionGroup);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(seriesPartitionSlot, timePartitionSlot, 
dataRegionGroup, weight);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 6c2108f5898..497f4085410 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.thrift.TException;
@@ -281,13 +280,14 @@ public class DataPartitionTable {
    * SeriesPartitionSlot.
    *
    * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartition, null if there are no DataPartitions in 
the specified
+   * @return The last DataPartitionEntry, null if there are no DataPartitions 
in the specified
    *     SeriesPartitionSlot
    */
-  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
-      TSeriesPartitionSlot seriesPartitionSlot) {
+  public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot 
seriesPartitionSlot) {
     if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
-      return dataPartitionMap.get(seriesPartitionSlot).getLastDataPartition();
+      return dataPartitionMap
+          .get(seriesPartitionSlot)
+          .getLastDataPartitionEntry(seriesPartitionSlot);
     } else {
       return null;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index d72157a5037..58f2ba52b9c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -24,7 +24,6 @@ import 
org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.thrift.TException;
@@ -247,16 +246,19 @@ public class SeriesPartitionTable {
    * Get the DataPartition with max TimePartition of the specified Database 
and the
    * SeriesPartitionSlot.
    *
-   * @return The last DataPartition, null if there are no DataPartitions
+   * @param seriesPartitionSlot The specified SeriesPartitionSlot
+   * @return The last DataPartitionEntry, null if there are no DataPartitions
    */
-  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition() {
+  public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot 
seriesPartitionSlot) {
     Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
         seriesPartitionMap.lastEntry();
     if (lastEntry == null) {
       return null;
     }
-    return new Pair<>(
-        lastEntry.getKey(), 
lastEntry.getValue().get(lastEntry.getValue().size() - 1));
+    return new DataPartitionEntry(
+        seriesPartitionSlot,
+        lastEntry.getKey(),
+        lastEntry.getValue().get(lastEntry.getValue().size() - 1));
   }
 
   /**
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java
new file mode 100644
index 00000000000..c92d4564b76
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class DataPartitionEntryTest {
+
+  private static final int SERIES_SLOT_NUM = 1000;
+  private static final long TIME_PARTITION_INTERVAL =
+      CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
+
+  @Test
+  public void testOrder() {
+    List<DataPartitionEntry> entries = new ArrayList<>();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      entries.add(
+          new DataPartitionEntry(
+              new TSeriesPartitionSlot(i),
+              new TTimePartitionSlot(TIME_PARTITION_INTERVAL * i),
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, i)));
+    }
+
+    List<DataPartitionEntry> sortedEntries = new ArrayList<>(entries);
+    Collections.sort(sortedEntries);
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      Assert.assertEquals(entries.get(SERIES_SLOT_NUM - i - 1), 
sortedEntries.get(i));
+    }
+  }
+}

Reply via email to