This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch Computing-resource-balancing
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7ac4f8735bdd472fd32e87ccdda18ae095928f74
Author: YongzaoDan <[email protected]>
AuthorDate: Sat Jul 29 14:54:08 2023 +0800

    Finish updation
---
 .../partition/IoTDBPartitionInheritPolicyIT.java   |  14 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  17 +-
 .../manager/load/balancer/PartitionBalancer.java   | 151 ++++----------
 .../load/balancer/partition/DataAllotTable.java    | 227 ---------------------
 .../partition/DataPartitionPolicyTable.java        | 144 +++++++++++++
 .../manager/partition/PartitionManager.java        |  70 ++-----
 .../partition/DatabasePartitionTable.java          |  67 ++----
 .../persistence/partition/PartitionInfo.java       |  83 ++------
 .../impl/schema/DeleteDatabaseProcedure.java       |   1 +
 .../statemachine/CreateRegionGroupsProcedure.java  |   6 +-
 .../balancer/partition/DataAllotTableTest.java     | 198 ------------------
 .../partition/DataPartitionPolicyTableTest.java    | 105 ++++++++++
 .../commons/partition/DataPartitionTable.java      | 115 ++---------
 .../commons/partition/SeriesPartitionTable.java    |  80 +-------
 .../iotdb/commons/structure/BalanceTreeMap.java    |   4 +
 15 files changed, 388 insertions(+), 894 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index aecd22375f5..bdb233950e2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -95,19 +95,7 @@ public class IoTDBPartitionInheritPolicyIT {
     Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new 
ConcurrentHashMap<>();
 
     // Test1: divide and inherit DataPartitions from scratch
-    // Notice: create all DataRegionGroups as soon as possible
-    // Otherwise, the allocation might be slightly unbalanced
-    ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
-        database, 0, 10, baseStartTime, baseStartTime + 1, 
testTimePartitionInterval);
-    ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
-        database,
-        10,
-        testSeriesSlotNum,
-        baseStartTime,
-        baseStartTime + 1,
-        testTimePartitionInterval);
-
-    for (long timePartitionSlot = baseStartTime + 1;
+    for (long timePartitionSlot = baseStartTime;
         timePartitionSlot < baseStartTime + testTimePartitionSlotsNum;
         timePartitionSlot++) {
       for (int seriesPartitionSlot = 0;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 0c284a149ee..e3472e2e0f2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -130,17 +130,8 @@ public class LoadManager {
     return 
partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
   }
 
-  /**
-   * Re-balance runtime status cached in the PartitionBalancer. This method 
may shift the
-   * currentTimePartition or update the DataAllotTable.
-   */
-  public void reBalancePartitionPolicyIfNecessary(
-      Map<String, DataPartitionTable> assignedDataPartition) {
-    
partitionBalancer.reBalanceDataPartitionPolicyIfNecessary(assignedDataPartition);
-  }
-
-  public void updateDataAllotTable(String database) {
-    partitionBalancer.updateDataAllotTable(database);
+  public void reBalanceDataPartitionPolicy(String database) {
+    partitionBalancer.reBalanceDataPartitionPolicy(database);
   }
 
   public void broadcastLatestRegionRouteMap() {
@@ -161,6 +152,10 @@ public class LoadManager {
     partitionBalancer.clearPartitionBalancer();
   }
 
+  public void clearPartitionBalancer() {
+    partitionBalancer.clearPartitionBalancer();
+  }
+
   /**
    * Safely get NodeStatus by NodeId.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index efd36a65382..42ac541655c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -23,18 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.commons.structure.BalanceTreeMap;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
 import org.apache.iotdb.confignode.manager.IManager;
-import 
org.apache.iotdb.confignode.manager.load.balancer.partition.DataAllotTable;
+import 
org.apache.iotdb.confignode.manager.load.balancer.partition.DataPartitionPolicyTable;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
@@ -43,8 +39,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -56,17 +50,12 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class PartitionBalancer {
 
-  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
-  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
-  private static final long TIME_PARTITION_INTERVAL =
-      CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionBalancer.class);
 
   private final IManager configManager;
 
-  // Map<DatabaseName, DataAllotTable>
-  private final Map<String, DataAllotTable> dataAllotTableMap;
+  // Map<DatabaseName, DataPartitionPolicyTable>
+  private final Map<String, DataPartitionPolicyTable> dataAllotTableMap;
 
   public PartitionBalancer(IManager configManager) {
     this.configManager = configManager;
@@ -141,9 +130,8 @@ public class PartitionBalancer {
         counter.put(pair.getRight(), pair.getLeft().intValue());
       }
 
-      DataAllotTable allotTable = dataAllotTableMap.get(database);
-      allotTable.acquireReadLock();
-      TTimePartitionSlot currentTimePartition = 
allotTable.getCurrentTimePartition();
+      DataPartitionPolicyTable allotTable = dataAllotTableMap.get(database);
+      allotTable.acquireLock();
       DataPartitionTable dataPartitionTable = new DataPartitionTable();
 
       try {
@@ -161,26 +149,36 @@ public class PartitionBalancer {
           for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
 
             // 1. The historical DataPartition will try to inherit successor 
DataPartition first
-            if (timePartitionSlot.getStartTime() < 
currentTimePartition.getStartTime()) {
-              TConsensusGroupId successor =
-                  getPartitionManager()
-                      .getSuccessorDataPartition(database, 
seriesPartitionSlot, timePartitionSlot);
-              if (successor != null && counter.containsKey(successor)) {
-                seriesPartitionTable.putDataPartition(timePartitionSlot, 
successor);
-                counter.put(successor, counter.get(successor) + 1);
-                continue;
-              }
+            TConsensusGroupId successor =
+                getPartitionManager()
+                    .getSuccessorDataPartition(database, seriesPartitionSlot, 
timePartitionSlot);
+            if (successor != null && counter.containsKey(successor)) {
+              seriesPartitionTable.putDataPartition(timePartitionSlot, 
successor);
+              counter.put(successor, counter.get(successor) + 1);
+              continue;
             }
 
             // 2. Assign DataPartition base on the DataAllotTable
-            TConsensusGroupId allotGroupId = 
allotTable.getRegionGroupId(seriesPartitionSlot);
+            TConsensusGroupId allotGroupId =
+                
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
             if (counter.containsKey(allotGroupId)) {
               seriesPartitionTable.putDataPartition(timePartitionSlot, 
allotGroupId);
               counter.put(allotGroupId, counter.get(allotGroupId) + 1);
               continue;
             }
 
-            // 3. Assign the DataPartition to DataRegionGroup with the least 
DataPartitions
+            // 3. The allotDataRegionGroup is unavailable,
+            // try to inherit predecessor DataPartition
+            TConsensusGroupId predecessor =
+                getPartitionManager()
+                    .getPredecessorDataPartition(database, 
seriesPartitionSlot, timePartitionSlot);
+            if (predecessor != null && counter.containsKey(predecessor)) {
+              seriesPartitionTable.putDataPartition(timePartitionSlot, 
predecessor);
+              counter.put(predecessor, counter.get(predecessor) + 1);
+              continue;
+            }
+
+            // 4. Assign the DataPartition to DataRegionGroup with the least 
DataPartitions
             // If the above DataRegionGroups are unavailable
             TConsensusGroupId greedyGroupId = counter.getKeyWithMinValue();
             seriesPartitionTable.putDataPartition(timePartitionSlot, 
greedyGroupId);
@@ -192,7 +190,7 @@ public class PartitionBalancer {
               .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
         }
       } finally {
-        allotTable.releaseReadLock();
+        allotTable.releaseLock();
       }
       result.put(database, dataPartitionTable);
     }
@@ -201,65 +199,16 @@ public class PartitionBalancer {
   }
 
   /**
-   * Try to re-balance the DataPartitionPolicy when new DataPartitions are 
created.
-   *
-   * @param assignedDataPartition new created DataPartitions
-   */
-  public void reBalanceDataPartitionPolicyIfNecessary(
-      Map<String, DataPartitionTable> assignedDataPartition) {
-    assignedDataPartition.forEach(
-        (database, dataPartitionTable) -> {
-          if (updateDataPartitionCount(database, dataPartitionTable)) {
-            // Update the DataAllotTable if the currentTimePartition is updated
-            updateDataAllotTable(database);
-          }
-        });
-  }
-
-  /**
-   * Update the DataPartitionCount in DataAllotTable.
+   * Re-balance the DataPartitionPolicyTable.
    *
    * @param database Database name
-   * @param dataPartitionTable new created DataPartitionTable
-   * @return true if the currentTimePartition is updated, false otherwise
    */
-  public boolean updateDataPartitionCount(String database, DataPartitionTable 
dataPartitionTable) {
-    try {
-      dataAllotTableMap
-          .get(database)
-          .addTimePartitionCount(dataPartitionTable.getTimeSlotCountMap());
-      return dataAllotTableMap
-          .get(database)
-          .updateCurrentTimePartition(
-              getPartitionManager().getRegionGroupCount(database, 
TConsensusGroupType.DataRegion));
-    } catch (DatabaseNotExistsException e) {
-      LOGGER.error("Database {} not exists when updateDataPartitionCount", 
database);
-      return false;
-    }
-  }
-
-  /**
-   * Update the DataAllotTable.
-   *
-   * @param database Database name
-   */
-  public void updateDataAllotTable(String database) {
-    List<DataPartitionEntry> lastDataPartitions = new ArrayList<>();
-    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
-      DataPartitionEntry lastDataPartition =
-          getPartitionManager().getLastDataPartitionEntry(database, 
seriesPartitionSlot);
-      if (lastDataPartition != null) {
-        lastDataPartitions.add(lastDataPartition);
-      }
-    }
-
+  public void reBalanceDataPartitionPolicy(String database) {
     try {
       dataAllotTableMap
-          .computeIfAbsent(database, empty -> new DataAllotTable())
-          .updateDataAllotTable(
-              getPartitionManager().getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion),
-              lastDataPartitions);
+          .computeIfAbsent(database, empty -> new DataPartitionPolicyTable())
+          .reBalanceDataPartitionPolicy(
+              getPartitionManager().getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion));
     } catch (DatabaseNotExistsException e) {
       LOGGER.error("Database {} not exists when updateDataAllotTable", 
database);
     }
@@ -272,38 +221,18 @@ public class PartitionBalancer {
         .getDatabaseNames()
         .forEach(
             database -> {
-              dataAllotTableMap.put(database, new DataAllotTable());
-              DataAllotTable dataAllotTable = dataAllotTableMap.get(database);
-              dataAllotTable.acquireWriteLock();
+              dataAllotTableMap.put(database, new DataPartitionPolicyTable());
+              DataPartitionPolicyTable dataPartitionPolicyTable = 
dataAllotTableMap.get(database);
               try {
-                int threshold =
-                    DataAllotTable.timePartitionThreshold(
-                        getPartitionManager()
-                            .getRegionGroupCount(database, 
TConsensusGroupType.DataRegion));
-                TTimePartitionSlot maxTimePartitionSlot =
-                    getPartitionManager().getMaxTimePartitionSlot(database);
-                TTimePartitionSlot minTimePartitionSlot =
-                    getPartitionManager().getMinTimePartitionSlot(database);
-                TTimePartitionSlot currentTimePartition = 
maxTimePartitionSlot.deepCopy();
-                while (currentTimePartition.compareTo(minTimePartitionSlot) > 
0) {
-                  int seriesSlotCount =
-                      getPartitionManager().countSeriesSlot(database, 
currentTimePartition);
-                  if (seriesSlotCount >= threshold) {
-                    
dataAllotTable.setCurrentTimePartition(currentTimePartition.getStartTime());
-                    break;
-                  }
-                  dataAllotTable.addTimePartitionCount(
-                      Collections.singletonMap(currentTimePartition, 
seriesSlotCount));
-                  currentTimePartition.setStartTime(
-                      currentTimePartition.getStartTime() - 
TIME_PARTITION_INTERVAL);
-                }
-
-                dataAllotTable.setDataAllotMap(
+                // Put all DataRegionGroups into the DataPartitionPolicyTable
+                dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
+                    getPartitionManager()
+                        .getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion));
+                // Load the last DataAllotTable
+                dataPartitionPolicyTable.setDataAllotMap(
                     getPartitionManager().getLastDataAllotTable(database));
               } catch (DatabaseNotExistsException e) {
                 LOGGER.error("Database {} not exists when 
setupPartitionBalancer", database);
-              } finally {
-                dataAllotTable.releaseWriteLock();
               }
             });
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
deleted file mode 100644
index 02797d45d8d..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartitionEntry;
-import org.apache.iotdb.commons.structure.BalanceTreeMap;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-public class DataAllotTable {
-
-  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
-  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
-
-  private final ReentrantReadWriteLock dataAllotTableLock;
-  private final AtomicReference<TTimePartitionSlot> currentTimePartition;
-  // Map<TimePartitionSlot, DataPartitionCount>
-  // Cache the number of DataPartitions in each future TimePartitionSlot
-  private final TreeMap<TTimePartitionSlot, AtomicInteger> 
dataPartitionCounter;
-  // Map<SeriesPartitionSlot, RegionGroupId>
-  // The optimal allocation of SeriesSlots to RegionGroups in the 
currentTimePartition
-  private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap;
-
-  public DataAllotTable() {
-    this.dataAllotTableLock = new ReentrantReadWriteLock();
-    this.currentTimePartition = new AtomicReference<>(new 
TTimePartitionSlot(0));
-    this.dataPartitionCounter = new TreeMap<>();
-    this.dataAllotMap = new HashMap<>();
-  }
-
-  public boolean isEmpty() {
-    dataAllotTableLock.readLock().lock();
-    try {
-      return dataAllotMap.isEmpty();
-    } finally {
-      dataAllotTableLock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Update the DataAllotTable according to the current DataRegionGroups and 
future DataAllotTable.
-   *
-   * @param dataRegionGroups the current DataRegionGroups
-   * @param lastDataPartitions the last DataPartition of each 
SeriesPartitionSlot
-   */
-  public void updateDataAllotTable(
-      List<TConsensusGroupId> dataRegionGroups, List<DataPartitionEntry> 
lastDataPartitions) {
-    dataAllotTableLock.writeLock().lock();
-    try {
-      // mu is the average number of slots allocated to each regionGroup
-      int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
-
-      // The counter will maintain the number of slots allocated to each 
regionGroup
-      BalanceTreeMap<TConsensusGroupId, Integer> counter = new 
BalanceTreeMap<>();
-      dataRegionGroups.forEach(regionGroupId -> counter.put(regionGroupId, 0));
-
-      // Fill unallocated SeriesSlots
-      Set<TSeriesPartitionSlot> allocatedSeriesSlots =
-          lastDataPartitions.stream()
-              .map(DataPartitionEntry::getSeriesPartitionSlot)
-              .collect(Collectors.toSet());
-      for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-        TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
-        if (!allocatedSeriesSlots.contains(seriesPartitionSlot)) {
-          lastDataPartitions.add(
-              new DataPartitionEntry(
-                  seriesPartitionSlot, new TTimePartitionSlot(Long.MIN_VALUE), 
null));
-        }
-      }
-
-      // The allocated DataPartitions are sorted as follows:
-      // 1. Descending order of TimePartitionSlot
-      // 2. Ascending order of random weight
-      Collections.sort(lastDataPartitions);
-
-      Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new 
HashMap<>();
-      // Enumerate all SeriesPartitionSlots in descending order of their 
TimePartitionSlot
-      for (DataPartitionEntry entry : lastDataPartitions) {
-        TSeriesPartitionSlot seriesPartitionSlot = 
entry.getSeriesPartitionSlot();
-        TConsensusGroupId allocatedRegionGroupId = entry.getDataRegionGroup();
-        if (allocatedRegionGroupId != null
-            // Inherit DataRegionGroup if it has been allocated in the future
-            && (entry.getTimePartitionSlot().getStartTime()
-                    > currentTimePartition.get().getStartTime()
-                // Inherit DataRegionGroup when the slotNum of 
oldRegionGroupId is less than average
-                || counter.get(allocatedRegionGroupId) < mu)) {
-          newAllotTable.put(seriesPartitionSlot, allocatedRegionGroupId);
-          counter.put(allocatedRegionGroupId, 
counter.get(allocatedRegionGroupId) + 1);
-          continue;
-        }
-
-        // Otherwise, choose the regionGroup with the least slotNum to keep 
load balance
-        TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue();
-        newAllotTable.put(seriesPartitionSlot, newRegionGroupId);
-        counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1);
-      }
-
-      dataAllotMap.clear();
-      dataAllotMap.putAll(newAllotTable);
-    } finally {
-      dataAllotTableLock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Update the current time partition and remove the useless time partitions.
-   *
-   * @param regionGroupNum the number of regionGroups
-   * @return whether the current time partition is updated
-   */
-  public boolean updateCurrentTimePartition(int regionGroupNum) {
-    int threshold = timePartitionThreshold(regionGroupNum);
-    dataAllotTableLock.writeLock().lock();
-    try {
-      AtomicLong newStartTime = new AtomicLong(Long.MIN_VALUE);
-      dataPartitionCounter.forEach(
-          (timePartition, counter) -> {
-            // Select the maximum TimePartition whose slotNum is greater than 
the following equation
-            // Ensure that the remaining slots can be still distributed to new 
regionGroups
-            if (counter.get() >= threshold && timePartition.getStartTime() > 
newStartTime.get()) {
-              newStartTime.set(timePartition.getStartTime());
-            }
-          });
-
-      if (newStartTime.get() > currentTimePartition.get().getStartTime()) {
-        currentTimePartition.set(new TTimePartitionSlot(newStartTime.get()));
-        List<TTimePartitionSlot> removeTimePartitionSlots =
-            dataPartitionCounter.keySet().stream()
-                .filter(timePartition -> timePartition.getStartTime() < 
newStartTime.get())
-                .collect(Collectors.toList());
-        removeTimePartitionSlots.forEach(dataPartitionCounter::remove);
-        return true;
-      }
-    } finally {
-      dataAllotTableLock.writeLock().unlock();
-    }
-    return false;
-  }
-
-  public void addTimePartitionCount(Map<TTimePartitionSlot, Integer> 
timePartitionCountMap) {
-    dataAllotTableLock.writeLock().lock();
-    try {
-      timePartitionCountMap.forEach(
-          (timePartition, count) ->
-              dataPartitionCounter
-                  .computeIfAbsent(timePartition, empty -> new 
AtomicInteger(0))
-                  .addAndGet(count));
-    } finally {
-      dataAllotTableLock.writeLock().unlock();
-    }
-  }
-
-  public TTimePartitionSlot getCurrentTimePartition() {
-    return currentTimePartition.get();
-  }
-
-  public TConsensusGroupId getRegionGroupId(TSeriesPartitionSlot 
seriesPartitionSlot) {
-    dataAllotTableLock.readLock().lock();
-    try {
-      return dataAllotMap.get(seriesPartitionSlot);
-    } finally {
-      dataAllotTableLock.readLock().unlock();
-    }
-  }
-
-  /** Only use this interface when init PartitionBalancer. */
-  public void setCurrentTimePartition(long startTime) {
-    currentTimePartition.set(new TTimePartitionSlot(startTime));
-  }
-
-  /** Only use this interface when init PartitionBalancer. */
-  public void setDataAllotMap(Map<TSeriesPartitionSlot, TConsensusGroupId> 
dataAllotMap) {
-    this.dataAllotMap.putAll(dataAllotMap);
-  }
-
-  public static int timePartitionThreshold(int regionGroupNum) {
-    return (int) (SERIES_SLOT_NUM * (1.0 - 2.0 / regionGroupNum));
-  }
-
-  public void acquireReadLock() {
-    dataAllotTableLock.readLock().lock();
-  }
-
-  public void releaseReadLock() {
-    dataAllotTableLock.readLock().unlock();
-  }
-
-  public void acquireWriteLock() {
-    dataAllotTableLock.writeLock().lock();
-  }
-
-  public void releaseWriteLock() {
-    dataAllotTableLock.writeLock().unlock();
-  }
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
new file mode 100644
index 00000000000..410d1edd593
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class DataPartitionPolicyTable {
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+  private final ReentrantLock dataAllotTableLock;
+
+  // Map<SeriesPartitionSlot, RegionGroupId>
+  // The optimal allocation of SeriesSlots to RegionGroups
+  private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap;
+
+  // Map<RegionGroupId, SeriesPartitionSlot Count>
+  // The number of SeriesSlots allocated to each RegionGroup in dataAllotMap
+  private final BalanceTreeMap<TConsensusGroupId, Integer> 
seriesPartitionSlotCounter;
+
+  public DataPartitionPolicyTable() {
+    this.dataAllotTableLock = new ReentrantLock();
+    this.dataAllotMap = new HashMap<>();
+    this.seriesPartitionSlotCounter = new BalanceTreeMap<>();
+  }
+
+  /**
+   * Get or activate the specified SeriesPartitionSlot in dataAllotMap.
+   *
+   * @param seriesPartitionSlot The specified SeriesPartitionSlot
+   * @return The RegionGroupId of the specified SeriesPartitionSlot, activate 
when its empty yet
+   */
+  public TConsensusGroupId getRegionGroupIdOrActivateIfNecessary(
+      TSeriesPartitionSlot seriesPartitionSlot) {
+    if (dataAllotMap.containsKey(seriesPartitionSlot)) {
+      return dataAllotMap.get(seriesPartitionSlot);
+    }
+
+    TConsensusGroupId regionGroupId = 
seriesPartitionSlotCounter.getKeyWithMinValue();
+    dataAllotMap.put(seriesPartitionSlot, regionGroupId);
+    seriesPartitionSlotCounter.put(
+        regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1);
+    return regionGroupId;
+  }
+
+  /**
+   * Re-balance the allocation of SeriesSlots to RegionGroups.
+   *
+   * @param dataRegionGroups All DataRegionGroups currently in the Database
+   */
+  public void reBalanceDataPartitionPolicy(List<TConsensusGroupId> 
dataRegionGroups) {
+    dataAllotTableLock.lock();
+    try {
+      dataRegionGroups.forEach(
+          dataRegionGroup -> {
+            if (!seriesPartitionSlotCounter.containsKey(dataRegionGroup)) {
+              seriesPartitionSlotCounter.put(dataRegionGroup, 0);
+            }
+          });
+
+      // Enumerate all SeriesSlots randomly
+      List<TSeriesPartitionSlot> seriesPartitionSlots = new ArrayList<>();
+      for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+        seriesPartitionSlots.add(new TSeriesPartitionSlot(i));
+      }
+      Collections.shuffle(seriesPartitionSlots);
+
+      int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
+      for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) {
+        if (!dataAllotMap.containsKey(seriesPartitionSlot)) {
+          continue;
+        }
+
+        TConsensusGroupId regionGroupId = 
dataAllotMap.get(seriesPartitionSlot);
+        int seriesPartitionSlotCount = 
seriesPartitionSlotCounter.get(regionGroupId);
+        if (seriesPartitionSlotCount > mu) {
+          // Remove from dataAllotMap if the number of SeriesSlots is greater 
than mu
+          dataAllotMap.remove(seriesPartitionSlot);
+          seriesPartitionSlotCounter.put(regionGroupId, 
seriesPartitionSlotCount - 1);
+        }
+      }
+    } finally {
+      dataAllotTableLock.unlock();
+    }
+  }
+
+  /** Only use this interface when init PartitionBalancer. */
+  public void setDataAllotMap(Map<TSeriesPartitionSlot, TConsensusGroupId> 
dataAllotMap) {
+    dataAllotTableLock.lock();
+    try {
+      int mu = SERIES_SLOT_NUM / seriesPartitionSlotCounter.size();
+      dataAllotMap.forEach(
+          (seriesPartitionSlot, regionGroupId) -> {
+            if (seriesPartitionSlotCounter.get(regionGroupId) < mu) {
+              // Put into dataAllotMap only when the number of SeriesSlots
+              // allocated to the RegionGroup is less than mu
+              this.dataAllotMap.put(seriesPartitionSlot, regionGroupId);
+              seriesPartitionSlotCounter.put(
+                  regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) 
+ 1);
+            }
+            // Otherwise, clear this SeriesPartitionSlot and wait for 
re-activating
+          });
+    } finally {
+      dataAllotTableLock.unlock();
+    }
+  }
+
+  public void acquireLock() {
+    dataAllotTableLock.lock();
+  }
+
+  public void releaseLock() {
+    dataAllotTableLock.unlock();
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 65429f70ec4..ac0df2916a9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
@@ -398,9 +397,6 @@ public class PartitionManager {
         resp.setStatus(status);
         return resp;
       }
-
-      // Statistical allocation result and re-balance the DataPartitionPolicy 
if necessary
-      
getLoadManager().reBalancePartitionPolicyIfNecessary(assignedDataPartition);
     }
 
     resp = (DataPartitionResp) getDataPartition(req);
@@ -618,7 +614,7 @@ public class PartitionManager {
    * @param database DatabaseName
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @return The specific DataPartition's predecessor if exists, null otherwise
+   * @return The specific DataPartition's successor if exists, null otherwise
    */
   public TConsensusGroupId getSuccessorDataPartition(
       String database,
@@ -628,6 +624,23 @@ public class PartitionManager {
         database, seriesPartitionSlot, timePartitionSlot);
   }
 
+  /**
+   * Only leader use this interface. Checks whether the specified 
DataPartition has a predecessor
+   * and returns if it does.
+   *
+   * @param database DatabaseName
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's predecessor if exists, null otherwise
+   */
+  public TConsensusGroupId getPredecessorDataPartition(
+      String database,
+      TSeriesPartitionSlot seriesPartitionSlot,
+      TTimePartitionSlot timePartitionSlot) {
+    return partitionInfo.getPredecessorDataPartition(
+        database, seriesPartitionSlot, timePartitionSlot);
+  }
+
   /**
    * Get the DataNodes who contain the specified Database's Schema or Data.
    *
@@ -1284,53 +1297,6 @@ public class PartitionManager {
     partitionInfo.getDataRegionIds(databases, dataRegionIds);
   }
 
-  /**
-   * Get the max TimePartitionSlot of the specified Database.
-   *
-   * @param database The specified Database
-   * @return The max TimePartitionSlot, null if the Database doesn't exist or 
there are no
-   *     DataPartitions yet
-   */
-  public TTimePartitionSlot getMaxTimePartitionSlot(String database) {
-    return partitionInfo.getMaxTimePartitionSlot(database);
-  }
-
-  /**
-   * Get the min TimePartitionSlot of the specified Database.
-   *
-   * @param database The specified Database
-   * @return The last DataPartition, null if the Database doesn't exist or 
there are no
-   *     DataPartitions in the specified SeriesPartitionSlot
-   */
-  public TTimePartitionSlot getMinTimePartitionSlot(String database) {
-    return partitionInfo.getMinTimePartitionSlot(database);
-  }
-
-  /**
-   * Get the DataPartition with max TimePartition of the specified Database 
and the
-   * SeriesPartitionSlot.
-   *
-   * @param database The specified Database
-   * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartitionEntry, null if the Database doesn't exist 
or there are no
-   *     DataPartitions yet
-   */
-  public DataPartitionEntry getLastDataPartitionEntry(
-      String database, TSeriesPartitionSlot seriesPartitionSlot) {
-    return partitionInfo.getLastDataPartitionEntry(database, 
seriesPartitionSlot);
-  }
-
-  /**
-   * Count SeriesSlot in the specified TimePartitionSlot of the Database.
-   *
-   * @param database The specified Database
-   * @param timePartitionSlot The specified TimePartitionSlot
-   * @return The count of SeriesSlot
-   */
-  public int countSeriesSlot(String database, TTimePartitionSlot 
timePartitionSlot) {
-    return partitionInfo.countSeriesSlot(database, timePartitionSlot);
-  }
-
   /**
    * Get the last DataAllotTable of the specified Database.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index a27168d8f8f..a629569470a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import 
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
@@ -99,19 +98,6 @@ public class DatabasePartitionTable {
                 replicaSet.getRegionId(), new 
RegionGroup(System.currentTimeMillis(), replicaSet)));
   }
 
-  /**
-   * Delete RegionGroups' cache.
-   *
-   * @param replicaSets List<TRegionReplicaSet>
-   */
-  public void deleteRegionGroups(List<TRegionReplicaSet> replicaSets) {
-    replicaSets.forEach(replicaSet -> 
regionGroupMap.remove(replicaSet.getRegionId()));
-  }
-
-  public Set<TConsensusGroupId> getAllConsensusGroupId() {
-    return regionGroupMap.keySet();
-  }
-
   /** @return Deep copy of all Regions' RegionReplicaSet within one 
StorageGroup */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     List<TRegionReplicaSet> result = new ArrayList<>();
@@ -270,13 +256,25 @@ public class DatabasePartitionTable {
    *
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @return The specific DataPartition's predecessor if exists, null otherwise
+   * @return The specific DataPartition's successor if exists, null otherwise
    */
   public TConsensusGroupId getSuccessorDataPartition(
       TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
     return dataPartitionTable.getSuccessorDataPartition(seriesPartitionSlot, 
timePartitionSlot);
   }
 
+  /**
+   * Checks whether the specified DataPartition has a predecessor and returns 
if it does.
+   *
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's predecessor if exists, null otherwise
+   */
+  public TConsensusGroupId getPredecessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
+    return dataPartitionTable.getPredecessorDataPartition(seriesPartitionSlot, 
timePartitionSlot);
+  }
+
   /**
    * Create SchemaPartition within the specific StorageGroup.
    *
@@ -564,45 +562,6 @@ public class DatabasePartitionTable {
     return dataRegionIds;
   }
 
-  /**
-   * Get the max TimePartitionSlot.
-   *
-   * @return The max TimePartitionSlot, null if there are no DataPartitions yet
-   */
-  public TTimePartitionSlot getMaxTimePartitionSlot() {
-    return dataPartitionTable.getMaxTimePartitionSlot();
-  }
-
-  /**
-   * Get the min TimePartitionSlot.
-   *
-   * @return The min TimePartitionSlot, null if there are no DataPartitions yet
-   */
-  public TTimePartitionSlot getMinTimePartitionSlot() {
-    return dataPartitionTable.getMinTimePartitionSlot();
-  }
-
-  /**
-   * Get the DataPartition with max TimePartition of the specified the 
SeriesPartitionSlot.
-   *
-   * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartitionEntry, null if there are no DataPartitions 
in the specified
-   *     SeriesPartitionSlot
-   */
-  public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot 
seriesPartitionSlot) {
-    return dataPartitionTable.getLastDataPartitionEntry(seriesPartitionSlot);
-  }
-
-  /**
-   * Count SeriesSlot in the specified TimePartitionSlot.
-   *
-   * @param timePartitionSlot The specified TimePartitionSlot
-   * @return The count of SeriesSlot
-   */
-  public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) {
-    return dataPartitionTable.countSeriesSlot(timePartitionSlot);
-  }
-
   /**
    * Get the last DataAllotTable.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index bfd509345eb..66ab605e066 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartitionEntry;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
@@ -392,7 +391,7 @@ public class PartitionInfo implements SnapshotProcessor {
    * @param database DatabaseName
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @return The specific DataPartition's predecessor if exists, null otherwise
+   * @return The specific DataPartition's successor if exists, null otherwise
    */
   public TConsensusGroupId getSuccessorDataPartition(
       String database,
@@ -407,6 +406,27 @@ public class PartitionInfo implements SnapshotProcessor {
     }
   }
 
+  /**
+   * Checks whether the specified DataPartition has a predecessor and returns 
if it does.
+   *
+   * @param database DatabaseName
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's predecessor if exists, null otherwise
+   */
+  public TConsensusGroupId getPredecessorDataPartition(
+      String database,
+      TSeriesPartitionSlot seriesPartitionSlot,
+      TTimePartitionSlot timePartitionSlot) {
+    if (isDatabaseExisted(database)) {
+      return databasePartitionTables
+          .get(database)
+          .getPredecessorDataPartition(seriesPartitionSlot, timePartitionSlot);
+    } else {
+      return null;
+    }
+  }
+
   /**
    * Check if the specified Database exists.
    *
@@ -802,65 +822,6 @@ public class PartitionInfo implements SnapshotProcessor {
     return schemaPartitionSet;
   }
 
-  /**
-   * Get the max TimePartitionSlot of the specified Database.
-   *
-   * @param database The specified Database
-   * @return The max TimePartitionSlot, null if the Database doesn't exist or 
there are no
-   *     DataPartitions yet
-   */
-  public TTimePartitionSlot getMaxTimePartitionSlot(String database) {
-    if (isDatabaseExisted(database)) {
-      return databasePartitionTables.get(database).getMaxTimePartitionSlot();
-    }
-    return null;
-  }
-
-  /**
-   * Get the min TimePartitionSlot of the specified Database.
-   *
-   * @param database The specified Database
-   * @return The min TimePartitionSlot, null if the Database doesn't exist or 
there are no
-   *     DataPartitions yet
-   */
-  public TTimePartitionSlot getMinTimePartitionSlot(String database) {
-    if (isDatabaseExisted(database)) {
-      return databasePartitionTables.get(database).getMinTimePartitionSlot();
-    }
-    return null;
-  }
-
-  /**
-   * Get the DataPartition with max TimePartition of the specified Database 
and the
-   * SeriesPartitionSlot.
-   *
-   * @param database The specified Database
-   * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartitionEntry, null if the Database doesn't exist 
or there are no
-   *     DataPartitions in the specified SeriesPartitionSlot
-   */
-  public DataPartitionEntry getLastDataPartitionEntry(
-      String database, TSeriesPartitionSlot seriesPartitionSlot) {
-    if (isDatabaseExisted(database)) {
-      return 
databasePartitionTables.get(database).getLastDataPartitionEntry(seriesPartitionSlot);
-    }
-    return null;
-  }
-
-  /**
-   * Count SeriesSlot in the specified TimePartitionSlot of the Database.
-   *
-   * @param database The specified Database
-   * @param timePartitionSlot The specified TimePartitionSlot
-   * @return The count of SeriesSlot
-   */
-  public int countSeriesSlot(String database, TTimePartitionSlot 
timePartitionSlot) {
-    if (isDatabaseExisted(database)) {
-      return 
databasePartitionTables.get(database).countSeriesSlot(timePartitionSlot);
-    }
-    return 0;
-  }
-
   /**
    * Get the last DataAllotTable of the specified Database.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 34211efdeb6..864f7a3c186 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -206,6 +206,7 @@ public class DeleteDatabaseProcedure
             LOG.info(
                 "[DeleteDatabaseProcedure] Database: {} is deleted 
successfully",
                 deleteDatabaseSchema.getName());
+            env.getConfigManager().getLoadManager().clearPartitionBalancer();
             return Flow.NO_MORE_STATE;
           } else if (getCycles() > RETRY_THRESHOLD) {
             setFailure(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 0aeb4f056af..cf0af07a26c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -206,13 +206,15 @@ public class CreateRegionGroupsProcedure
         break;
       case CREATE_REGION_GROUPS_FINISH:
         if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
-          // Update all corresponding DataAllotTables
+          // Re-balance all corresponding DataPartitionPolicyTable
           persistPlan
               .getRegionGroupMap()
               .keySet()
               .forEach(
                   database ->
-                      
env.getConfigManager().getLoadManager().updateDataAllotTable(database));
+                      env.getConfigManager()
+                          .getLoadManager()
+                          .reBalanceDataPartitionPolicy(database));
         }
         return Flow.NO_MORE_STATE;
     }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
deleted file mode 100644
index 67ff0dc0099..00000000000
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartitionEntry;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DataAllotTableTest {
-
-  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
-  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
-
-  @Test
-  public void testUpdateCurrentTimePartition() {
-    final int regionGroupNum = 5;
-    final int threshold = 
DataAllotTable.timePartitionThreshold(regionGroupNum);
-    final long timePartitionInterval = 1000;
-    DataAllotTable dataAllotTable = new DataAllotTable();
-
-    // Test 1: currentTimePartition is the first one
-    TTimePartitionSlot nextTimePartition = new TTimePartitionSlot(1000);
-    Map<TTimePartitionSlot, Integer> timePartitionCountMap = new HashMap<>();
-    timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), 
threshold);
-    timePartitionCountMap.put(
-        new TTimePartitionSlot(nextTimePartition.getStartTime() + 
timePartitionInterval),
-        threshold - 100);
-    timePartitionCountMap.put(
-        new TTimePartitionSlot(nextTimePartition.getStartTime() + 2 * 
timePartitionInterval),
-        threshold - 200);
-    dataAllotTable.addTimePartitionCount(timePartitionCountMap);
-    dataAllotTable.updateCurrentTimePartition(regionGroupNum);
-    Assert.assertEquals(nextTimePartition, 
dataAllotTable.getCurrentTimePartition());
-
-    // Test 2: currentTimePartition in the middle
-    timePartitionCountMap.clear();
-    nextTimePartition = new TTimePartitionSlot(5000);
-    timePartitionCountMap.put(
-        new TTimePartitionSlot(nextTimePartition.getStartTime() - 
timePartitionInterval),
-        threshold - 100);
-    timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), 
threshold);
-    timePartitionCountMap.put(
-        new TTimePartitionSlot(nextTimePartition.getStartTime() + 
timePartitionInterval),
-        threshold - 100);
-    dataAllotTable.addTimePartitionCount(timePartitionCountMap);
-    dataAllotTable.updateCurrentTimePartition(regionGroupNum);
-    Assert.assertEquals(nextTimePartition, 
dataAllotTable.getCurrentTimePartition());
-
-    // Test 3: currentTimePartition will be the maximum timePartitionSlot that 
greater or equal to
-    // threshold
-    int offset = 200;
-    Random random = new Random();
-    timePartitionCountMap.clear();
-    TTimePartitionSlot baseSlot = new TTimePartitionSlot(10000);
-    nextTimePartition = baseSlot;
-    timePartitionCountMap.put(nextTimePartition, threshold);
-    for (int i = 1; i < 100; i++) {
-      TTimePartitionSlot slot =
-          new TTimePartitionSlot(baseSlot.getStartTime() + i * 
timePartitionInterval);
-      int count = threshold + random.nextInt(offset) - offset / 2;
-      timePartitionCountMap.put(slot, count);
-      if (count >= threshold) {
-        nextTimePartition = slot;
-      }
-    }
-    dataAllotTable.addTimePartitionCount(timePartitionCountMap);
-    dataAllotTable.updateCurrentTimePartition(regionGroupNum);
-    Assert.assertEquals(nextTimePartition, 
dataAllotTable.getCurrentTimePartition());
-  }
-
-  @Test
-  public void testUpdateDataAllotTable() {
-    DataAllotTable dataAllotTable = new DataAllotTable();
-    List<TConsensusGroupId> dataRegionGroups = new ArrayList<>();
-
-    // Test 1: construct DataAllotTable from scratch
-    TConsensusGroupId group1 = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
-    dataRegionGroups.add(group1);
-    dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>());
-    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
-      // All SeriesPartitionSlots belong to group1
-      Assert.assertEquals(group1, 
dataAllotTable.getRegionGroupId(seriesPartitionSlot));
-    }
-
-    // Test2: extend DataRegionGroups
-    Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new 
HashMap<>();
-    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
2));
-    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
3));
-    dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>());
-    int mu = SERIES_SLOT_NUM / 3;
-    Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>();
-    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
-      TConsensusGroupId groupId = 
dataAllotTable.getRegionGroupId(seriesPartitionSlot);
-      lastDataAllotTable.put(seriesPartitionSlot, groupId);
-      counter.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
-    }
-    // All DataRegionGroups divide SeriesPartitionSlots evenly
-    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
counter.entrySet()) {
-      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
-    }
-
-    // Test 3: extend DataRegionGroups while inherit future allocate result
-    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
4));
-    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
5));
-    Random random = new Random();
-    Set<TSeriesPartitionSlot> selectedSlots = new HashSet<>();
-    List<DataPartitionEntry> lastDataPartitions = new ArrayList<>();
-    Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>();
-    for (int i = 0; i < 50; i++) {
-      // Randomly pre-allocate 50 SeriesPartitionSlots
-      TSeriesPartitionSlot seriesPartitionSlot =
-          new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
-      while (selectedSlots.contains(seriesPartitionSlot)) {
-        seriesPartitionSlot = new 
TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
-      }
-      selectedSlots.add(seriesPartitionSlot);
-      lastDataPartitions.add(
-          new DataPartitionEntry(
-              seriesPartitionSlot,
-              new TTimePartitionSlot(Long.MAX_VALUE),
-              new TConsensusGroupId(TConsensusGroupType.DataRegion, 
random.nextInt(2) + 4)));
-    }
-    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-      // Record the other allocation result
-      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
-      if (!selectedSlots.contains(seriesPartitionSlot)) {
-        lastDataPartitions.add(
-            new DataPartitionEntry(
-                seriesPartitionSlot,
-                new TTimePartitionSlot(Long.MIN_VALUE),
-                lastDataAllotTable.get(seriesPartitionSlot)));
-      }
-    }
-    dataAllotTable.updateDataAllotTable(dataRegionGroups, lastDataPartitions);
-    mu = SERIES_SLOT_NUM / 5;
-    counter.clear();
-    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
-      TConsensusGroupId groupId = 
dataAllotTable.getRegionGroupId(seriesPartitionSlot);
-      counter.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
-      if (groupId.getId() < 4) {
-        // Most of SeriesPartitionSlots in the first three DataRegionGroups 
should remain unchanged
-        Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), 
groupId);
-        unchangedSlots.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
-      }
-    }
-    // All DataRegionGroups divide SeriesPartitionSlots evenly
-    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
counter.entrySet()) {
-      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
-    }
-    // All SeriesPartitionSlots that have been allocated before should be 
allocated to the same
-    // DataRegionGroup
-    for (int i = 0; i < 50; i++) {
-      Assert.assertEquals(
-          lastDataPartitions.get(i).getDataRegionGroup(),
-          
dataAllotTable.getRegionGroupId(lastDataPartitions.get(i).getSeriesPartitionSlot()));
-    }
-    // Most of SeriesPartitionSlots in the first three DataRegionGroups should 
remain unchanged
-    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
unchangedSlots.entrySet()) {
-      Assert.assertEquals(mu, counterEntry.getValue().get());
-    }
-  }
-}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
new file mode 100644
index 00000000000..70ce9231c17
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataPartitionPolicyTableTest {
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+  @Test
+  public void testUpdateDataAllotTable() {
+    DataPartitionPolicyTable dataPartitionPolicyTable = new 
DataPartitionPolicyTable();
+    List<TConsensusGroupId> dataRegionGroups = new ArrayList<>();
+
+    // Test 1: construct DataAllotTable from scratch
+    TConsensusGroupId group1 = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
+    dataRegionGroups.add(group1);
+    dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      // All SeriesPartitionSlots belong to group1
+      Assert.assertEquals(
+          group1,
+          
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot));
+    }
+
+    // Test2: extend DataRegionGroups
+    Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new 
HashMap<>();
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
2));
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
3));
+    dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+    int mu = SERIES_SLOT_NUM / 3;
+    Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      TConsensusGroupId groupId =
+          
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+      lastDataAllotTable.put(seriesPartitionSlot, groupId);
+      counter.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
+    }
+    // All DataRegionGroups divide SeriesPartitionSlots evenly
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
counter.entrySet()) {
+      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+    }
+
+    // Test 3: extend DataRegionGroups while inherit as much 
SeriesPartitionSlots as possible
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
4));
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
5));
+    dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups);
+    Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>();
+    mu = SERIES_SLOT_NUM / 5;
+    counter.clear();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      TConsensusGroupId groupId =
+          
dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
+      counter.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
+      if (groupId.getId() < 4) {
+        // Most of SeriesPartitionSlots in the first three DataRegionGroups 
should remain unchanged
+        Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), 
groupId);
+        unchangedSlots.computeIfAbsent(groupId, empty -> new 
AtomicInteger(0)).incrementAndGet();
+      }
+    }
+    // All DataRegionGroups divide SeriesPartitionSlots evenly
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
counter.entrySet()) {
+      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+    }
+    // Most of SeriesPartitionSlots in the first three DataRegionGroups should 
remain unchanged
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : 
unchangedSlots.entrySet()) {
+      Assert.assertEquals(mu, counterEntry.getValue().get());
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 497f4085410..c0eac239d88 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -40,9 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class DataPartitionTable {
@@ -106,7 +104,7 @@ public class DataPartitionTable {
    *
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @return The specific DataPartition's predecessor if exists, null otherwise
+   * @return The specific DataPartition's successor if exists, null otherwise
    */
   public TConsensusGroupId getSuccessorDataPartition(
       TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
@@ -117,6 +115,24 @@ public class DataPartitionTable {
     }
   }
 
+  /**
+   * Checks whether the specified DataPartition has a predecessor and returns 
if it does.
+   *
+   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specific DataPartition's predecessor if exists, null otherwise
+   */
+  public TConsensusGroupId getPredecessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
+    if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
+      return dataPartitionMap
+          .get(seriesPartitionSlot)
+          .getPredecessorDataPartition(timePartitionSlot);
+    } else {
+      return null;
+    }
+  }
+
   /**
    * Create DataPartition within the specific StorageGroup
    *
@@ -233,82 +249,6 @@ public class DataPartitionTable {
         .collect(Collectors.toList());
   }
 
-  /**
-   * Get the max TimePartitionSlot of the specified Database.
-   *
-   * @return The max TimePartitionSlot, null if there are no DataPartitions yet
-   */
-  public TTimePartitionSlot getMaxTimePartitionSlot() {
-    AtomicReference<TTimePartitionSlot> maxTimeSlot =
-        new AtomicReference<>(new TTimePartitionSlot(0));
-    dataPartitionMap
-        .values()
-        .forEach(
-            seriesPartitionTable -> {
-              TTimePartitionSlot timePartitionSlot = 
seriesPartitionTable.getMaxTimePartitionSlot();
-              if (timePartitionSlot != null
-                  && timePartitionSlot.getStartTime() > 
maxTimeSlot.get().getStartTime()) {
-                maxTimeSlot.set(timePartitionSlot);
-              }
-            });
-    return maxTimeSlot.get().getStartTime() > 0 ? maxTimeSlot.get() : null;
-  }
-
-  /**
-   * Get the min TimePartitionSlot of the specified Database.
-   *
-   * @return The min TimePartitionSlot, null if there are no DataPartitions yet
-   */
-  public TTimePartitionSlot getMinTimePartitionSlot() {
-    AtomicReference<TTimePartitionSlot> minTimeSlot =
-        new AtomicReference<>(new TTimePartitionSlot(Long.MAX_VALUE));
-    dataPartitionMap
-        .values()
-        .forEach(
-            seriesPartitionTable -> {
-              TTimePartitionSlot timePartitionSlot = 
seriesPartitionTable.getMinTimePartitionSlot();
-              if (timePartitionSlot != null
-                  && timePartitionSlot.getStartTime() < 
minTimeSlot.get().getStartTime()) {
-                minTimeSlot.set(timePartitionSlot);
-              }
-            });
-    return minTimeSlot.get().getStartTime() < Long.MAX_VALUE ? 
minTimeSlot.get() : null;
-  }
-
-  /**
-   * Get the DataPartition with max TimePartition of the specified Database 
and the
-   * SeriesPartitionSlot.
-   *
-   * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartitionEntry, null if there are no DataPartitions 
in the specified
-   *     SeriesPartitionSlot
-   */
-  public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot 
seriesPartitionSlot) {
-    if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
-      return dataPartitionMap
-          .get(seriesPartitionSlot)
-          .getLastDataPartitionEntry(seriesPartitionSlot);
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Count SeriesSlot in the specified TimePartitionSlot of the Database.
-   *
-   * @param timePartitionSlot The specified TimePartitionSlot
-   * @return The count of SeriesSlot
-   */
-  public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) {
-    AtomicInteger count = new AtomicInteger(0);
-    dataPartitionMap
-        .values()
-        .forEach(
-            seriesPartitionTable ->
-                
count.addAndGet(seriesPartitionTable.isDataPartitionExist(timePartitionSlot)));
-    return count.get();
-  }
-
   /**
    * Get the last DataAllotTable.
    *
@@ -322,23 +262,6 @@ public class DataPartitionTable {
     return result;
   }
 
-  /**
-   * Get the number of DataPartitions in each TimePartitionSlot
-   *
-   * @return Map<TimePartitionSlot, the number of DataPartitions>
-   */
-  public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() {
-    Map<TTimePartitionSlot, Integer> result = new ConcurrentHashMap<>();
-    dataPartitionMap.forEach(
-        (seriesPartitionSlot, seriesPartitionTable) ->
-            seriesPartitionTable
-                .getTimeSlotCountMap()
-                .forEach(
-                    (timePartitionSlot, count) ->
-                        result.merge(timePartitionSlot, count, Integer::sum)));
-    return result;
-  }
-
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 58f2ba52b9c..1b87e42ad11 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -34,10 +34,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.Vector;
@@ -136,6 +134,17 @@ public class SeriesPartitionTable {
     return successorSlot == null ? null : 
seriesPartitionMap.get(successorSlot).get(0);
   }
 
+  /**
+   * Check and return the specified DataPartition's predecessor.
+   *
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return The specified DataPartition's predecessor if exists, null 
otherwise
+   */
+  public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot 
timePartitionSlot) {
+    TTimePartitionSlot predecessorSlot = 
seriesPartitionMap.lowerKey(timePartitionSlot);
+    return predecessorSlot == null ? null : 
seriesPartitionMap.get(predecessorSlot).get(0);
+  }
+
   /**
    * Query a timePartition's corresponding dataRegionIds.
    *
@@ -216,61 +225,6 @@ public class SeriesPartitionTable {
     return result;
   }
 
-  /**
-   * Get the max TimePartitionSlot of the specified Database.
-   *
-   * @return The max TimePartitionSlot, null if there are no DataPartitions yet
-   */
-  public TTimePartitionSlot getMaxTimePartitionSlot() {
-    try {
-      return seriesPartitionMap.lastKey();
-    } catch (NoSuchElementException e) {
-      return null;
-    }
-  }
-
-  /**
-   * Get the min TimePartitionSlot of the specified Database.
-   *
-   * @return The min TimePartitionSlot, null if there are no DataPartitions yet
-   */
-  public TTimePartitionSlot getMinTimePartitionSlot() {
-    try {
-      return seriesPartitionMap.firstKey();
-    } catch (NoSuchElementException e) {
-      return null;
-    }
-  }
-
-  /**
-   * Get the DataPartition with max TimePartition of the specified Database 
and the
-   * SeriesPartitionSlot.
-   *
-   * @param seriesPartitionSlot The specified SeriesPartitionSlot
-   * @return The last DataPartitionEntry, null if there are no DataPartitions
-   */
-  public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot 
seriesPartitionSlot) {
-    Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
-        seriesPartitionMap.lastEntry();
-    if (lastEntry == null) {
-      return null;
-    }
-    return new DataPartitionEntry(
-        seriesPartitionSlot,
-        lastEntry.getKey(),
-        lastEntry.getValue().get(lastEntry.getValue().size() - 1));
-  }
-
-  /**
-   * Check whether the specified DataPartition exists.
-   *
-   * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @return 1 if exists, 0 otherwise
-   */
-  public int isDataPartitionExist(TTimePartitionSlot timePartitionSlot) {
-    return seriesPartitionMap.containsKey(timePartitionSlot) ? 1 : 0;
-  }
-
   /**
    * Get the last DataPartition's ConsensusGroupId.
    *
@@ -285,18 +239,6 @@ public class SeriesPartitionTable {
     return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
   }
 
-  /**
-   * Get the number of DataPartitions in each TimePartitionSlot.
-   *
-   * @return Map<TimePartitionSlot, the number of DataPartitions>
-   */
-  public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() {
-    Map<TTimePartitionSlot, Integer> result = new HashMap<>();
-    seriesPartitionMap.forEach(
-        (timePartitionSlot, consensusGroupIds) -> 
result.put(timePartitionSlot, 1));
-    return result;
-  }
-
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
index 1ea29c9a80b..09a49898635 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -75,6 +75,10 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
     return keyValueMap.containsKey(key);
   }
 
+  public int size() {
+    return keyValueMap.size();
+  }
+
   @TestOnly
   public void remove(K key) {
     V value = keyValueMap.getOrDefault(key, null);

Reply via email to