This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Computing-resource-balancing in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7ac4f8735bdd472fd32e87ccdda18ae095928f74 Author: YongzaoDan <[email protected]> AuthorDate: Sat Jul 29 14:54:08 2023 +0800 Finish updation --- .../partition/IoTDBPartitionInheritPolicyIT.java | 14 +- .../iotdb/confignode/manager/load/LoadManager.java | 17 +- .../manager/load/balancer/PartitionBalancer.java | 151 ++++---------- .../load/balancer/partition/DataAllotTable.java | 227 --------------------- .../partition/DataPartitionPolicyTable.java | 144 +++++++++++++ .../manager/partition/PartitionManager.java | 70 ++----- .../partition/DatabasePartitionTable.java | 67 ++---- .../persistence/partition/PartitionInfo.java | 83 ++------ .../impl/schema/DeleteDatabaseProcedure.java | 1 + .../statemachine/CreateRegionGroupsProcedure.java | 6 +- .../balancer/partition/DataAllotTableTest.java | 198 ------------------ .../partition/DataPartitionPolicyTableTest.java | 105 ++++++++++ .../commons/partition/DataPartitionTable.java | 115 ++--------- .../commons/partition/SeriesPartitionTable.java | 80 +------- .../iotdb/commons/structure/BalanceTreeMap.java | 4 + 15 files changed, 388 insertions(+), 894 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java index aecd22375f5..bdb233950e2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java @@ -95,19 +95,7 @@ public class IoTDBPartitionInheritPolicyIT { Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new ConcurrentHashMap<>(); // Test1: divide and inherit DataPartitions from scratch - // Notice: create all DataRegionGroups as soon as possible - // Otherwise, the allocation might be slightly unbalanced - ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( - database, 0, 10, baseStartTime, baseStartTime + 1, testTimePartitionInterval); - ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( - database, - 10, - testSeriesSlotNum, - baseStartTime, - baseStartTime + 1, - testTimePartitionInterval); - - for (long timePartitionSlot = baseStartTime + 1; + for (long timePartitionSlot = baseStartTime; timePartitionSlot < baseStartTime + testTimePartitionSlotsNum; timePartitionSlot++) { for (int seriesPartitionSlot = 0; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 0c284a149ee..e3472e2e0f2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -130,17 +130,8 @@ public class LoadManager { return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap); } - /** - * Re-balance runtime status cached in the PartitionBalancer. This method may shift the - * currentTimePartition or update the DataAllotTable. - */ - public void reBalancePartitionPolicyIfNecessary( - Map<String, DataPartitionTable> assignedDataPartition) { - partitionBalancer.reBalanceDataPartitionPolicyIfNecessary(assignedDataPartition); - } - - public void updateDataAllotTable(String database) { - partitionBalancer.updateDataAllotTable(database); + public void reBalanceDataPartitionPolicy(String database) { + partitionBalancer.reBalanceDataPartitionPolicy(database); } public void broadcastLatestRegionRouteMap() { @@ -161,6 +152,10 @@ public class LoadManager { partitionBalancer.clearPartitionBalancer(); } + public void clearPartitionBalancer() { + partitionBalancer.clearPartitionBalancer(); + } + /** * Safely get NodeStatus by NodeId. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java index efd36a65382..42ac541655c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java @@ -23,18 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.SeriesPartitionTable; import org.apache.iotdb.commons.structure.BalanceTreeMap; -import org.apache.iotdb.confignode.conf.ConfigNodeConfig; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.manager.IManager; -import org.apache.iotdb.confignode.manager.load.balancer.partition.DataAllotTable; +import org.apache.iotdb.confignode.manager.load.balancer.partition.DataPartitionPolicyTable; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; @@ -43,8 +39,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -56,17 +50,12 @@ import java.util.concurrent.ConcurrentHashMap; */ public class PartitionBalancer { - private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); - private static final long TIME_PARTITION_INTERVAL = - CommonDescriptor.getInstance().getConfig().getTimePartitionInterval(); - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionBalancer.class); private final IManager configManager; - // Map<DatabaseName, DataAllotTable> - private final Map<String, DataAllotTable> dataAllotTableMap; + // Map<DatabaseName, DataPartitionPolicyTable> + private final Map<String, DataPartitionPolicyTable> dataAllotTableMap; public PartitionBalancer(IManager configManager) { this.configManager = configManager; @@ -141,9 +130,8 @@ public class PartitionBalancer { counter.put(pair.getRight(), pair.getLeft().intValue()); } - DataAllotTable allotTable = dataAllotTableMap.get(database); - allotTable.acquireReadLock(); - TTimePartitionSlot currentTimePartition = allotTable.getCurrentTimePartition(); + DataPartitionPolicyTable allotTable = dataAllotTableMap.get(database); + allotTable.acquireLock(); DataPartitionTable dataPartitionTable = new DataPartitionTable(); try { @@ -161,26 +149,36 @@ public class PartitionBalancer { for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) { // 1. The historical DataPartition will try to inherit successor DataPartition first - if (timePartitionSlot.getStartTime() < currentTimePartition.getStartTime()) { - TConsensusGroupId successor = - getPartitionManager() - .getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); - if (successor != null && counter.containsKey(successor)) { - seriesPartitionTable.putDataPartition(timePartitionSlot, successor); - counter.put(successor, counter.get(successor) + 1); - continue; - } + TConsensusGroupId successor = + getPartitionManager() + .getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); + if (successor != null && counter.containsKey(successor)) { + seriesPartitionTable.putDataPartition(timePartitionSlot, successor); + counter.put(successor, counter.get(successor) + 1); + continue; } // 2. Assign DataPartition base on the DataAllotTable - TConsensusGroupId allotGroupId = allotTable.getRegionGroupId(seriesPartitionSlot); + TConsensusGroupId allotGroupId = + allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot); if (counter.containsKey(allotGroupId)) { seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId); counter.put(allotGroupId, counter.get(allotGroupId) + 1); continue; } - // 3. Assign the DataPartition to DataRegionGroup with the least DataPartitions + // 3. The allotDataRegionGroup is unavailable, + // try to inherit predecessor DataPartition + TConsensusGroupId predecessor = + getPartitionManager() + .getPredecessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); + if (predecessor != null && counter.containsKey(predecessor)) { + seriesPartitionTable.putDataPartition(timePartitionSlot, predecessor); + counter.put(predecessor, counter.get(predecessor) + 1); + continue; + } + + // 4. Assign the DataPartition to DataRegionGroup with the least DataPartitions // If the above DataRegionGroups are unavailable TConsensusGroupId greedyGroupId = counter.getKeyWithMinValue(); seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId); @@ -192,7 +190,7 @@ public class PartitionBalancer { .put(seriesPartitionEntry.getKey(), seriesPartitionTable); } } finally { - allotTable.releaseReadLock(); + allotTable.releaseLock(); } result.put(database, dataPartitionTable); } @@ -201,65 +199,16 @@ public class PartitionBalancer { } /** - * Try to re-balance the DataPartitionPolicy when new DataPartitions are created. - * - * @param assignedDataPartition new created DataPartitions - */ - public void reBalanceDataPartitionPolicyIfNecessary( - Map<String, DataPartitionTable> assignedDataPartition) { - assignedDataPartition.forEach( - (database, dataPartitionTable) -> { - if (updateDataPartitionCount(database, dataPartitionTable)) { - // Update the DataAllotTable if the currentTimePartition is updated - updateDataAllotTable(database); - } - }); - } - - /** - * Update the DataPartitionCount in DataAllotTable. + * Re-balance the DataPartitionPolicyTable. * * @param database Database name - * @param dataPartitionTable new created DataPartitionTable - * @return true if the currentTimePartition is updated, false otherwise */ - public boolean updateDataPartitionCount(String database, DataPartitionTable dataPartitionTable) { - try { - dataAllotTableMap - .get(database) - .addTimePartitionCount(dataPartitionTable.getTimeSlotCountMap()); - return dataAllotTableMap - .get(database) - .updateCurrentTimePartition( - getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.DataRegion)); - } catch (DatabaseNotExistsException e) { - LOGGER.error("Database {} not exists when updateDataPartitionCount", database); - return false; - } - } - - /** - * Update the DataAllotTable. - * - * @param database Database name - */ - public void updateDataAllotTable(String database) { - List<DataPartitionEntry> lastDataPartitions = new ArrayList<>(); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); - DataPartitionEntry lastDataPartition = - getPartitionManager().getLastDataPartitionEntry(database, seriesPartitionSlot); - if (lastDataPartition != null) { - lastDataPartitions.add(lastDataPartition); - } - } - + public void reBalanceDataPartitionPolicy(String database) { try { dataAllotTableMap - .computeIfAbsent(database, empty -> new DataAllotTable()) - .updateDataAllotTable( - getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion), - lastDataPartitions); + .computeIfAbsent(database, empty -> new DataPartitionPolicyTable()) + .reBalanceDataPartitionPolicy( + getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion)); } catch (DatabaseNotExistsException e) { LOGGER.error("Database {} not exists when updateDataAllotTable", database); } @@ -272,38 +221,18 @@ public class PartitionBalancer { .getDatabaseNames() .forEach( database -> { - dataAllotTableMap.put(database, new DataAllotTable()); - DataAllotTable dataAllotTable = dataAllotTableMap.get(database); - dataAllotTable.acquireWriteLock(); + dataAllotTableMap.put(database, new DataPartitionPolicyTable()); + DataPartitionPolicyTable dataPartitionPolicyTable = dataAllotTableMap.get(database); try { - int threshold = - DataAllotTable.timePartitionThreshold( - getPartitionManager() - .getRegionGroupCount(database, TConsensusGroupType.DataRegion)); - TTimePartitionSlot maxTimePartitionSlot = - getPartitionManager().getMaxTimePartitionSlot(database); - TTimePartitionSlot minTimePartitionSlot = - getPartitionManager().getMinTimePartitionSlot(database); - TTimePartitionSlot currentTimePartition = maxTimePartitionSlot.deepCopy(); - while (currentTimePartition.compareTo(minTimePartitionSlot) > 0) { - int seriesSlotCount = - getPartitionManager().countSeriesSlot(database, currentTimePartition); - if (seriesSlotCount >= threshold) { - dataAllotTable.setCurrentTimePartition(currentTimePartition.getStartTime()); - break; - } - dataAllotTable.addTimePartitionCount( - Collections.singletonMap(currentTimePartition, seriesSlotCount)); - currentTimePartition.setStartTime( - currentTimePartition.getStartTime() - TIME_PARTITION_INTERVAL); - } - - dataAllotTable.setDataAllotMap( + // Put all DataRegionGroups into the DataPartitionPolicyTable + dataPartitionPolicyTable.reBalanceDataPartitionPolicy( + getPartitionManager() + .getAllRegionGroupIds(database, TConsensusGroupType.DataRegion)); + // Load the last DataAllotTable + dataPartitionPolicyTable.setDataAllotMap( getPartitionManager().getLastDataAllotTable(database)); } catch (DatabaseNotExistsException e) { LOGGER.error("Database {} not exists when setupPartitionBalancer", database); - } finally { - dataAllotTable.releaseWriteLock(); } }); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java deleted file mode 100644 index 02797d45d8d..00000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.manager.load.balancer.partition; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.partition.DataPartitionEntry; -import org.apache.iotdb.commons.structure.BalanceTreeMap; -import org.apache.iotdb.confignode.conf.ConfigNodeConfig; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -public class DataAllotTable { - - private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); - - private final ReentrantReadWriteLock dataAllotTableLock; - private final AtomicReference<TTimePartitionSlot> currentTimePartition; - // Map<TimePartitionSlot, DataPartitionCount> - // Cache the number of DataPartitions in each future TimePartitionSlot - private final TreeMap<TTimePartitionSlot, AtomicInteger> dataPartitionCounter; - // Map<SeriesPartitionSlot, RegionGroupId> - // The optimal allocation of SeriesSlots to RegionGroups in the currentTimePartition - private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap; - - public DataAllotTable() { - this.dataAllotTableLock = new ReentrantReadWriteLock(); - this.currentTimePartition = new AtomicReference<>(new TTimePartitionSlot(0)); - this.dataPartitionCounter = new TreeMap<>(); - this.dataAllotMap = new HashMap<>(); - } - - public boolean isEmpty() { - dataAllotTableLock.readLock().lock(); - try { - return dataAllotMap.isEmpty(); - } finally { - dataAllotTableLock.readLock().unlock(); - } - } - - /** - * Update the DataAllotTable according to the current DataRegionGroups and future DataAllotTable. - * - * @param dataRegionGroups the current DataRegionGroups - * @param lastDataPartitions the last DataPartition of each SeriesPartitionSlot - */ - public void updateDataAllotTable( - List<TConsensusGroupId> dataRegionGroups, List<DataPartitionEntry> lastDataPartitions) { - dataAllotTableLock.writeLock().lock(); - try { - // mu is the average number of slots allocated to each regionGroup - int mu = SERIES_SLOT_NUM / dataRegionGroups.size(); - - // The counter will maintain the number of slots allocated to each regionGroup - BalanceTreeMap<TConsensusGroupId, Integer> counter = new BalanceTreeMap<>(); - dataRegionGroups.forEach(regionGroupId -> counter.put(regionGroupId, 0)); - - // Fill unallocated SeriesSlots - Set<TSeriesPartitionSlot> allocatedSeriesSlots = - lastDataPartitions.stream() - .map(DataPartitionEntry::getSeriesPartitionSlot) - .collect(Collectors.toSet()); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); - if (!allocatedSeriesSlots.contains(seriesPartitionSlot)) { - lastDataPartitions.add( - new DataPartitionEntry( - seriesPartitionSlot, new TTimePartitionSlot(Long.MIN_VALUE), null)); - } - } - - // The allocated DataPartitions are sorted as follows: - // 1. Descending order of TimePartitionSlot - // 2. Ascending order of random weight - Collections.sort(lastDataPartitions); - - Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new HashMap<>(); - // Enumerate all SeriesPartitionSlots in descending order of their TimePartitionSlot - for (DataPartitionEntry entry : lastDataPartitions) { - TSeriesPartitionSlot seriesPartitionSlot = entry.getSeriesPartitionSlot(); - TConsensusGroupId allocatedRegionGroupId = entry.getDataRegionGroup(); - if (allocatedRegionGroupId != null - // Inherit DataRegionGroup if it has been allocated in the future - && (entry.getTimePartitionSlot().getStartTime() - > currentTimePartition.get().getStartTime() - // Inherit DataRegionGroup when the slotNum of oldRegionGroupId is less than average - || counter.get(allocatedRegionGroupId) < mu)) { - newAllotTable.put(seriesPartitionSlot, allocatedRegionGroupId); - counter.put(allocatedRegionGroupId, counter.get(allocatedRegionGroupId) + 1); - continue; - } - - // Otherwise, choose the regionGroup with the least slotNum to keep load balance - TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue(); - newAllotTable.put(seriesPartitionSlot, newRegionGroupId); - counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1); - } - - dataAllotMap.clear(); - dataAllotMap.putAll(newAllotTable); - } finally { - dataAllotTableLock.writeLock().unlock(); - } - } - - /** - * Update the current time partition and remove the useless time partitions. - * - * @param regionGroupNum the number of regionGroups - * @return whether the current time partition is updated - */ - public boolean updateCurrentTimePartition(int regionGroupNum) { - int threshold = timePartitionThreshold(regionGroupNum); - dataAllotTableLock.writeLock().lock(); - try { - AtomicLong newStartTime = new AtomicLong(Long.MIN_VALUE); - dataPartitionCounter.forEach( - (timePartition, counter) -> { - // Select the maximum TimePartition whose slotNum is greater than the following equation - // Ensure that the remaining slots can be still distributed to new regionGroups - if (counter.get() >= threshold && timePartition.getStartTime() > newStartTime.get()) { - newStartTime.set(timePartition.getStartTime()); - } - }); - - if (newStartTime.get() > currentTimePartition.get().getStartTime()) { - currentTimePartition.set(new TTimePartitionSlot(newStartTime.get())); - List<TTimePartitionSlot> removeTimePartitionSlots = - dataPartitionCounter.keySet().stream() - .filter(timePartition -> timePartition.getStartTime() < newStartTime.get()) - .collect(Collectors.toList()); - removeTimePartitionSlots.forEach(dataPartitionCounter::remove); - return true; - } - } finally { - dataAllotTableLock.writeLock().unlock(); - } - return false; - } - - public void addTimePartitionCount(Map<TTimePartitionSlot, Integer> timePartitionCountMap) { - dataAllotTableLock.writeLock().lock(); - try { - timePartitionCountMap.forEach( - (timePartition, count) -> - dataPartitionCounter - .computeIfAbsent(timePartition, empty -> new AtomicInteger(0)) - .addAndGet(count)); - } finally { - dataAllotTableLock.writeLock().unlock(); - } - } - - public TTimePartitionSlot getCurrentTimePartition() { - return currentTimePartition.get(); - } - - public TConsensusGroupId getRegionGroupId(TSeriesPartitionSlot seriesPartitionSlot) { - dataAllotTableLock.readLock().lock(); - try { - return dataAllotMap.get(seriesPartitionSlot); - } finally { - dataAllotTableLock.readLock().unlock(); - } - } - - /** Only use this interface when init PartitionBalancer. */ - public void setCurrentTimePartition(long startTime) { - currentTimePartition.set(new TTimePartitionSlot(startTime)); - } - - /** Only use this interface when init PartitionBalancer. */ - public void setDataAllotMap(Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap) { - this.dataAllotMap.putAll(dataAllotMap); - } - - public static int timePartitionThreshold(int regionGroupNum) { - return (int) (SERIES_SLOT_NUM * (1.0 - 2.0 / regionGroupNum)); - } - - public void acquireReadLock() { - dataAllotTableLock.readLock().lock(); - } - - public void releaseReadLock() { - dataAllotTableLock.readLock().unlock(); - } - - public void acquireWriteLock() { - dataAllotTableLock.writeLock().lock(); - } - - public void releaseWriteLock() { - dataAllotTableLock.writeLock().unlock(); - } -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java new file mode 100644 index 00000000000..410d1edd593 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.structure.BalanceTreeMap; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +public class DataPartitionPolicyTable { + + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); + + private final ReentrantLock dataAllotTableLock; + + // Map<SeriesPartitionSlot, RegionGroupId> + // The optimal allocation of SeriesSlots to RegionGroups + private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap; + + // Map<RegionGroupId, SeriesPartitionSlot Count> + // The number of SeriesSlots allocated to each RegionGroup in dataAllotMap + private final BalanceTreeMap<TConsensusGroupId, Integer> seriesPartitionSlotCounter; + + public DataPartitionPolicyTable() { + this.dataAllotTableLock = new ReentrantLock(); + this.dataAllotMap = new HashMap<>(); + this.seriesPartitionSlotCounter = new BalanceTreeMap<>(); + } + + /** + * Get or activate the specified SeriesPartitionSlot in dataAllotMap. + * + * @param seriesPartitionSlot The specified SeriesPartitionSlot + * @return The RegionGroupId of the specified SeriesPartitionSlot, activate when its empty yet + */ + public TConsensusGroupId getRegionGroupIdOrActivateIfNecessary( + TSeriesPartitionSlot seriesPartitionSlot) { + if (dataAllotMap.containsKey(seriesPartitionSlot)) { + return dataAllotMap.get(seriesPartitionSlot); + } + + TConsensusGroupId regionGroupId = seriesPartitionSlotCounter.getKeyWithMinValue(); + dataAllotMap.put(seriesPartitionSlot, regionGroupId); + seriesPartitionSlotCounter.put( + regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1); + return regionGroupId; + } + + /** + * Re-balance the allocation of SeriesSlots to RegionGroups. + * + * @param dataRegionGroups All DataRegionGroups currently in the Database + */ + public void reBalanceDataPartitionPolicy(List<TConsensusGroupId> dataRegionGroups) { + dataAllotTableLock.lock(); + try { + dataRegionGroups.forEach( + dataRegionGroup -> { + if (!seriesPartitionSlotCounter.containsKey(dataRegionGroup)) { + seriesPartitionSlotCounter.put(dataRegionGroup, 0); + } + }); + + // Enumerate all SeriesSlots randomly + List<TSeriesPartitionSlot> seriesPartitionSlots = new ArrayList<>(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + seriesPartitionSlots.add(new TSeriesPartitionSlot(i)); + } + Collections.shuffle(seriesPartitionSlots); + + int mu = SERIES_SLOT_NUM / dataRegionGroups.size(); + for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) { + if (!dataAllotMap.containsKey(seriesPartitionSlot)) { + continue; + } + + TConsensusGroupId regionGroupId = dataAllotMap.get(seriesPartitionSlot); + int seriesPartitionSlotCount = seriesPartitionSlotCounter.get(regionGroupId); + if (seriesPartitionSlotCount > mu) { + // Remove from dataAllotMap if the number of SeriesSlots is greater than mu + dataAllotMap.remove(seriesPartitionSlot); + seriesPartitionSlotCounter.put(regionGroupId, seriesPartitionSlotCount - 1); + } + } + } finally { + dataAllotTableLock.unlock(); + } + } + + /** Only use this interface when init PartitionBalancer. */ + public void setDataAllotMap(Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotMap) { + dataAllotTableLock.lock(); + try { + int mu = SERIES_SLOT_NUM / seriesPartitionSlotCounter.size(); + dataAllotMap.forEach( + (seriesPartitionSlot, regionGroupId) -> { + if (seriesPartitionSlotCounter.get(regionGroupId) < mu) { + // Put into dataAllotMap only when the number of SeriesSlots + // allocated to the RegionGroup is less than mu + this.dataAllotMap.put(seriesPartitionSlot, regionGroupId); + seriesPartitionSlotCounter.put( + regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1); + } + // Otherwise, clear this SeriesPartitionSlot and wait for re-activating + }); + } finally { + dataAllotTableLock.unlock(); + } + } + + public void acquireLock() { + dataAllotTableLock.lock(); + } + + public void releaseLock() { + dataAllotTableLock.unlock(); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 65429f70ec4..ac0df2916a9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -32,7 +32,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; @@ -398,9 +397,6 @@ public class PartitionManager { resp.setStatus(status); return resp; } - - // Statistical allocation result and re-balance the DataPartitionPolicy if necessary - getLoadManager().reBalancePartitionPolicyIfNecessary(assignedDataPartition); } resp = (DataPartitionResp) getDataPartition(req); @@ -618,7 +614,7 @@ public class PartitionManager { * @param database DatabaseName * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @return The specific DataPartition's predecessor if exists, null otherwise + * @return The specific DataPartition's successor if exists, null otherwise */ public TConsensusGroupId getSuccessorDataPartition( String database, @@ -628,6 +624,23 @@ public class PartitionManager { database, seriesPartitionSlot, timePartitionSlot); } + /** + * Only leader use this interface. Checks whether the specified DataPartition has a predecessor + * and returns if it does. + * + * @param database DatabaseName + * @param seriesPartitionSlot Corresponding SeriesPartitionSlot + * @param timePartitionSlot Corresponding TimePartitionSlot + * @return The specific DataPartition's predecessor if exists, null otherwise + */ + public TConsensusGroupId getPredecessorDataPartition( + String database, + TSeriesPartitionSlot seriesPartitionSlot, + TTimePartitionSlot timePartitionSlot) { + return partitionInfo.getPredecessorDataPartition( + database, seriesPartitionSlot, timePartitionSlot); + } + /** * Get the DataNodes who contain the specified Database's Schema or Data. * @@ -1284,53 +1297,6 @@ public class PartitionManager { partitionInfo.getDataRegionIds(databases, dataRegionIds); } - /** - * Get the max TimePartitionSlot of the specified Database. - * - * @param database The specified Database - * @return The max TimePartitionSlot, null if the Database doesn't exist or there are no - * DataPartitions yet - */ - public TTimePartitionSlot getMaxTimePartitionSlot(String database) { - return partitionInfo.getMaxTimePartitionSlot(database); - } - - /** - * Get the min TimePartitionSlot of the specified Database. - * - * @param database The specified Database - * @return The last DataPartition, null if the Database doesn't exist or there are no - * DataPartitions in the specified SeriesPartitionSlot - */ - public TTimePartitionSlot getMinTimePartitionSlot(String database) { - return partitionInfo.getMinTimePartitionSlot(database); - } - - /** - * Get the DataPartition with max TimePartition of the specified Database and the - * SeriesPartitionSlot. - * - * @param database The specified Database - * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartitionEntry, null if the Database doesn't exist or there are no - * DataPartitions yet - */ - public DataPartitionEntry getLastDataPartitionEntry( - String database, TSeriesPartitionSlot seriesPartitionSlot) { - return partitionInfo.getLastDataPartitionEntry(database, seriesPartitionSlot); - } - - /** - * Count SeriesSlot in the specified TimePartitionSlot of the Database. - * - * @param database The specified Database - * @param timePartitionSlot The specified TimePartitionSlot - * @return The count of SeriesSlot - */ - public int countSeriesSlot(String database, TTimePartitionSlot timePartitionSlot) { - return partitionInfo.countSeriesSlot(database, timePartitionSlot); - } - /** * Get the last DataAllotTable of the specified Database. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index a27168d8f8f..a629569470a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -25,7 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; @@ -99,19 +98,6 @@ public class DatabasePartitionTable { replicaSet.getRegionId(), new RegionGroup(System.currentTimeMillis(), replicaSet))); } - /** - * Delete RegionGroups' cache. - * - * @param replicaSets List<TRegionReplicaSet> - */ - public void deleteRegionGroups(List<TRegionReplicaSet> replicaSets) { - replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId())); - } - - public Set<TConsensusGroupId> getAllConsensusGroupId() { - return regionGroupMap.keySet(); - } - /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup */ public List<TRegionReplicaSet> getAllReplicaSets() { List<TRegionReplicaSet> result = new ArrayList<>(); @@ -270,13 +256,25 @@ public class DatabasePartitionTable { * * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @return The specific DataPartition's predecessor if exists, null otherwise + * @return The specific DataPartition's successor if exists, null otherwise */ public TConsensusGroupId getSuccessorDataPartition( TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) { return dataPartitionTable.getSuccessorDataPartition(seriesPartitionSlot, timePartitionSlot); } + /** + * Checks whether the specified DataPartition has a predecessor and returns if it does. + * + * @param seriesPartitionSlot Corresponding SeriesPartitionSlot + * @param timePartitionSlot Corresponding TimePartitionSlot + * @return The specific DataPartition's predecessor if exists, null otherwise + */ + public TConsensusGroupId getPredecessorDataPartition( + TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) { + return dataPartitionTable.getPredecessorDataPartition(seriesPartitionSlot, timePartitionSlot); + } + /** * Create SchemaPartition within the specific StorageGroup. * @@ -564,45 +562,6 @@ public class DatabasePartitionTable { return dataRegionIds; } - /** - * Get the max TimePartitionSlot. - * - * @return The max TimePartitionSlot, null if there are no DataPartitions yet - */ - public TTimePartitionSlot getMaxTimePartitionSlot() { - return dataPartitionTable.getMaxTimePartitionSlot(); - } - - /** - * Get the min TimePartitionSlot. - * - * @return The min TimePartitionSlot, null if there are no DataPartitions yet - */ - public TTimePartitionSlot getMinTimePartitionSlot() { - return dataPartitionTable.getMinTimePartitionSlot(); - } - - /** - * Get the DataPartition with max TimePartition of the specified the SeriesPartitionSlot. - * - * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartitionEntry, null if there are no DataPartitions in the specified - * SeriesPartitionSlot - */ - public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot seriesPartitionSlot) { - return dataPartitionTable.getLastDataPartitionEntry(seriesPartitionSlot); - } - - /** - * Count SeriesSlot in the specified TimePartitionSlot. - * - * @param timePartitionSlot The specified TimePartitionSlot - * @return The count of SeriesSlot - */ - public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) { - return dataPartitionTable.countSeriesSlot(timePartitionSlot); - } - /** * Get the last DataAllotTable. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index bfd509345eb..66ab605e066 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; @@ -392,7 +391,7 @@ public class PartitionInfo implements SnapshotProcessor { * @param database DatabaseName * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @return The specific DataPartition's predecessor if exists, null otherwise + * @return The specific DataPartition's successor if exists, null otherwise */ public TConsensusGroupId getSuccessorDataPartition( String database, @@ -407,6 +406,27 @@ public class PartitionInfo implements SnapshotProcessor { } } + /** + * Checks whether the specified DataPartition has a predecessor and returns if it does. + * + * @param database DatabaseName + * @param seriesPartitionSlot Corresponding SeriesPartitionSlot + * @param timePartitionSlot Corresponding TimePartitionSlot + * @return The specific DataPartition's predecessor if exists, null otherwise + */ + public TConsensusGroupId getPredecessorDataPartition( + String database, + TSeriesPartitionSlot seriesPartitionSlot, + TTimePartitionSlot timePartitionSlot) { + if (isDatabaseExisted(database)) { + return databasePartitionTables + .get(database) + .getPredecessorDataPartition(seriesPartitionSlot, timePartitionSlot); + } else { + return null; + } + } + /** * Check if the specified Database exists. * @@ -802,65 +822,6 @@ public class PartitionInfo implements SnapshotProcessor { return schemaPartitionSet; } - /** - * Get the max TimePartitionSlot of the specified Database. - * - * @param database The specified Database - * @return The max TimePartitionSlot, null if the Database doesn't exist or there are no - * DataPartitions yet - */ - public TTimePartitionSlot getMaxTimePartitionSlot(String database) { - if (isDatabaseExisted(database)) { - return databasePartitionTables.get(database).getMaxTimePartitionSlot(); - } - return null; - } - - /** - * Get the min TimePartitionSlot of the specified Database. - * - * @param database The specified Database - * @return The min TimePartitionSlot, null if the Database doesn't exist or there are no - * DataPartitions yet - */ - public TTimePartitionSlot getMinTimePartitionSlot(String database) { - if (isDatabaseExisted(database)) { - return databasePartitionTables.get(database).getMinTimePartitionSlot(); - } - return null; - } - - /** - * Get the DataPartition with max TimePartition of the specified Database and the - * SeriesPartitionSlot. - * - * @param database The specified Database - * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartitionEntry, null if the Database doesn't exist or there are no - * DataPartitions in the specified SeriesPartitionSlot - */ - public DataPartitionEntry getLastDataPartitionEntry( - String database, TSeriesPartitionSlot seriesPartitionSlot) { - if (isDatabaseExisted(database)) { - return databasePartitionTables.get(database).getLastDataPartitionEntry(seriesPartitionSlot); - } - return null; - } - - /** - * Count SeriesSlot in the specified TimePartitionSlot of the Database. - * - * @param database The specified Database - * @param timePartitionSlot The specified TimePartitionSlot - * @return The count of SeriesSlot - */ - public int countSeriesSlot(String database, TTimePartitionSlot timePartitionSlot) { - if (isDatabaseExisted(database)) { - return databasePartitionTables.get(database).countSeriesSlot(timePartitionSlot); - } - return 0; - } - /** * Get the last DataAllotTable of the specified Database. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 34211efdeb6..864f7a3c186 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -206,6 +206,7 @@ public class DeleteDatabaseProcedure LOG.info( "[DeleteDatabaseProcedure] Database: {} is deleted successfully", deleteDatabaseSchema.getName()); + env.getConfigManager().getLoadManager().clearPartitionBalancer(); return Flow.NO_MORE_STATE; } else if (getCycles() > RETRY_THRESHOLD) { setFailure( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java index 0aeb4f056af..cf0af07a26c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java @@ -206,13 +206,15 @@ public class CreateRegionGroupsProcedure break; case CREATE_REGION_GROUPS_FINISH: if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) { - // Update all corresponding DataAllotTables + // Re-balance all corresponding DataPartitionPolicyTable persistPlan .getRegionGroupMap() .keySet() .forEach( database -> - env.getConfigManager().getLoadManager().updateDataAllotTable(database)); + env.getConfigManager() + .getLoadManager() + .reBalanceDataPartitionPolicy(database)); } return Flow.NO_MORE_STATE; } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java deleted file mode 100644 index 67ff0dc0099..00000000000 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.manager.load.balancer.partition; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.partition.DataPartitionEntry; -import org.apache.iotdb.confignode.conf.ConfigNodeConfig; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -public class DataAllotTableTest { - - private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); - - @Test - public void testUpdateCurrentTimePartition() { - final int regionGroupNum = 5; - final int threshold = DataAllotTable.timePartitionThreshold(regionGroupNum); - final long timePartitionInterval = 1000; - DataAllotTable dataAllotTable = new DataAllotTable(); - - // Test 1: currentTimePartition is the first one - TTimePartitionSlot nextTimePartition = new TTimePartitionSlot(1000); - Map<TTimePartitionSlot, Integer> timePartitionCountMap = new HashMap<>(); - timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold); - timePartitionCountMap.put( - new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval), - threshold - 100); - timePartitionCountMap.put( - new TTimePartitionSlot(nextTimePartition.getStartTime() + 2 * timePartitionInterval), - threshold - 200); - dataAllotTable.addTimePartitionCount(timePartitionCountMap); - dataAllotTable.updateCurrentTimePartition(regionGroupNum); - Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition()); - - // Test 2: currentTimePartition in the middle - timePartitionCountMap.clear(); - nextTimePartition = new TTimePartitionSlot(5000); - timePartitionCountMap.put( - new TTimePartitionSlot(nextTimePartition.getStartTime() - timePartitionInterval), - threshold - 100); - timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold); - timePartitionCountMap.put( - new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval), - threshold - 100); - dataAllotTable.addTimePartitionCount(timePartitionCountMap); - dataAllotTable.updateCurrentTimePartition(regionGroupNum); - Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition()); - - // Test 3: currentTimePartition will be the maximum timePartitionSlot that greater or equal to - // threshold - int offset = 200; - Random random = new Random(); - timePartitionCountMap.clear(); - TTimePartitionSlot baseSlot = new TTimePartitionSlot(10000); - nextTimePartition = baseSlot; - timePartitionCountMap.put(nextTimePartition, threshold); - for (int i = 1; i < 100; i++) { - TTimePartitionSlot slot = - new TTimePartitionSlot(baseSlot.getStartTime() + i * timePartitionInterval); - int count = threshold + random.nextInt(offset) - offset / 2; - timePartitionCountMap.put(slot, count); - if (count >= threshold) { - nextTimePartition = slot; - } - } - dataAllotTable.addTimePartitionCount(timePartitionCountMap); - dataAllotTable.updateCurrentTimePartition(regionGroupNum); - Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition()); - } - - @Test - public void testUpdateDataAllotTable() { - DataAllotTable dataAllotTable = new DataAllotTable(); - List<TConsensusGroupId> dataRegionGroups = new ArrayList<>(); - - // Test 1: construct DataAllotTable from scratch - TConsensusGroupId group1 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 1); - dataRegionGroups.add(group1); - dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>()); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); - // All SeriesPartitionSlots belong to group1 - Assert.assertEquals(group1, dataAllotTable.getRegionGroupId(seriesPartitionSlot)); - } - - // Test2: extend DataRegionGroups - Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new HashMap<>(); - dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)); - dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3)); - dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>()); - int mu = SERIES_SLOT_NUM / 3; - Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>(); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); - TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot); - lastDataAllotTable.put(seriesPartitionSlot, groupId); - counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); - } - // All DataRegionGroups divide SeriesPartitionSlots evenly - for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) { - Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1); - } - - // Test 3: extend DataRegionGroups while inherit future allocate result - dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4)); - dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 5)); - Random random = new Random(); - Set<TSeriesPartitionSlot> selectedSlots = new HashSet<>(); - List<DataPartitionEntry> lastDataPartitions = new ArrayList<>(); - Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>(); - for (int i = 0; i < 50; i++) { - // Randomly pre-allocate 50 SeriesPartitionSlots - TSeriesPartitionSlot seriesPartitionSlot = - new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM)); - while (selectedSlots.contains(seriesPartitionSlot)) { - seriesPartitionSlot = new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM)); - } - selectedSlots.add(seriesPartitionSlot); - lastDataPartitions.add( - new DataPartitionEntry( - seriesPartitionSlot, - new TTimePartitionSlot(Long.MAX_VALUE), - new TConsensusGroupId(TConsensusGroupType.DataRegion, random.nextInt(2) + 4))); - } - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - // Record the other allocation result - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); - if (!selectedSlots.contains(seriesPartitionSlot)) { - lastDataPartitions.add( - new DataPartitionEntry( - seriesPartitionSlot, - new TTimePartitionSlot(Long.MIN_VALUE), - lastDataAllotTable.get(seriesPartitionSlot))); - } - } - dataAllotTable.updateDataAllotTable(dataRegionGroups, lastDataPartitions); - mu = SERIES_SLOT_NUM / 5; - counter.clear(); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); - TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot); - counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); - if (groupId.getId() < 4) { - // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged - Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), groupId); - unchangedSlots.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); - } - } - // All DataRegionGroups divide SeriesPartitionSlots evenly - for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) { - Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1); - } - // All SeriesPartitionSlots that have been allocated before should be allocated to the same - // DataRegionGroup - for (int i = 0; i < 50; i++) { - Assert.assertEquals( - lastDataPartitions.get(i).getDataRegionGroup(), - dataAllotTable.getRegionGroupId(lastDataPartitions.get(i).getSeriesPartitionSlot())); - } - // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged - for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : unchangedSlots.entrySet()) { - Assert.assertEquals(mu, counterEntry.getValue().get()); - } - } -} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java new file mode 100644 index 00000000000..70ce9231c17 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTableTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class DataPartitionPolicyTableTest { + + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); + + @Test + public void testUpdateDataAllotTable() { + DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable(); + List<TConsensusGroupId> dataRegionGroups = new ArrayList<>(); + + // Test 1: construct DataAllotTable from scratch + TConsensusGroupId group1 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 1); + dataRegionGroups.add(group1); + dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + // All SeriesPartitionSlots belong to group1 + Assert.assertEquals( + group1, + dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot)); + } + + // Test2: extend DataRegionGroups + Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new HashMap<>(); + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)); + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3)); + dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups); + int mu = SERIES_SLOT_NUM / 3; + Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + TConsensusGroupId groupId = + dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot); + lastDataAllotTable.put(seriesPartitionSlot, groupId); + counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); + } + // All DataRegionGroups divide SeriesPartitionSlots evenly + for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) { + Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1); + } + + // Test 3: extend DataRegionGroups while inherit as much SeriesPartitionSlots as possible + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4)); + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 5)); + dataPartitionPolicyTable.reBalanceDataPartitionPolicy(dataRegionGroups); + Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>(); + mu = SERIES_SLOT_NUM / 5; + counter.clear(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + TConsensusGroupId groupId = + dataPartitionPolicyTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot); + counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); + if (groupId.getId() < 4) { + // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged + Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), groupId); + unchangedSlots.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); + } + } + // All DataRegionGroups divide SeriesPartitionSlots evenly + for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) { + Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1); + } + // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged + for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : unchangedSlots.entrySet()) { + Assert.assertEquals(mu, counterEntry.getValue().get()); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 497f4085410..c0eac239d88 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -40,9 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class DataPartitionTable { @@ -106,7 +104,7 @@ public class DataPartitionTable { * * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @return The specific DataPartition's predecessor if exists, null otherwise + * @return The specific DataPartition's successor if exists, null otherwise */ public TConsensusGroupId getSuccessorDataPartition( TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) { @@ -117,6 +115,24 @@ public class DataPartitionTable { } } + /** + * Checks whether the specified DataPartition has a predecessor and returns if it does. + * + * @param seriesPartitionSlot Corresponding SeriesPartitionSlot + * @param timePartitionSlot Corresponding TimePartitionSlot + * @return The specific DataPartition's predecessor if exists, null otherwise + */ + public TConsensusGroupId getPredecessorDataPartition( + TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) { + if (dataPartitionMap.containsKey(seriesPartitionSlot)) { + return dataPartitionMap + .get(seriesPartitionSlot) + .getPredecessorDataPartition(timePartitionSlot); + } else { + return null; + } + } + /** * Create DataPartition within the specific StorageGroup * @@ -233,82 +249,6 @@ public class DataPartitionTable { .collect(Collectors.toList()); } - /** - * Get the max TimePartitionSlot of the specified Database. - * - * @return The max TimePartitionSlot, null if there are no DataPartitions yet - */ - public TTimePartitionSlot getMaxTimePartitionSlot() { - AtomicReference<TTimePartitionSlot> maxTimeSlot = - new AtomicReference<>(new TTimePartitionSlot(0)); - dataPartitionMap - .values() - .forEach( - seriesPartitionTable -> { - TTimePartitionSlot timePartitionSlot = seriesPartitionTable.getMaxTimePartitionSlot(); - if (timePartitionSlot != null - && timePartitionSlot.getStartTime() > maxTimeSlot.get().getStartTime()) { - maxTimeSlot.set(timePartitionSlot); - } - }); - return maxTimeSlot.get().getStartTime() > 0 ? maxTimeSlot.get() : null; - } - - /** - * Get the min TimePartitionSlot of the specified Database. - * - * @return The min TimePartitionSlot, null if there are no DataPartitions yet - */ - public TTimePartitionSlot getMinTimePartitionSlot() { - AtomicReference<TTimePartitionSlot> minTimeSlot = - new AtomicReference<>(new TTimePartitionSlot(Long.MAX_VALUE)); - dataPartitionMap - .values() - .forEach( - seriesPartitionTable -> { - TTimePartitionSlot timePartitionSlot = seriesPartitionTable.getMinTimePartitionSlot(); - if (timePartitionSlot != null - && timePartitionSlot.getStartTime() < minTimeSlot.get().getStartTime()) { - minTimeSlot.set(timePartitionSlot); - } - }); - return minTimeSlot.get().getStartTime() < Long.MAX_VALUE ? minTimeSlot.get() : null; - } - - /** - * Get the DataPartition with max TimePartition of the specified Database and the - * SeriesPartitionSlot. - * - * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartitionEntry, null if there are no DataPartitions in the specified - * SeriesPartitionSlot - */ - public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot seriesPartitionSlot) { - if (dataPartitionMap.containsKey(seriesPartitionSlot)) { - return dataPartitionMap - .get(seriesPartitionSlot) - .getLastDataPartitionEntry(seriesPartitionSlot); - } else { - return null; - } - } - - /** - * Count SeriesSlot in the specified TimePartitionSlot of the Database. - * - * @param timePartitionSlot The specified TimePartitionSlot - * @return The count of SeriesSlot - */ - public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) { - AtomicInteger count = new AtomicInteger(0); - dataPartitionMap - .values() - .forEach( - seriesPartitionTable -> - count.addAndGet(seriesPartitionTable.isDataPartitionExist(timePartitionSlot))); - return count.get(); - } - /** * Get the last DataAllotTable. * @@ -322,23 +262,6 @@ public class DataPartitionTable { return result; } - /** - * Get the number of DataPartitions in each TimePartitionSlot - * - * @return Map<TimePartitionSlot, the number of DataPartitions> - */ - public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() { - Map<TTimePartitionSlot, Integer> result = new ConcurrentHashMap<>(); - dataPartitionMap.forEach( - (seriesPartitionSlot, seriesPartitionTable) -> - seriesPartitionTable - .getTimeSlotCountMap() - .forEach( - (timePartitionSlot, count) -> - result.merge(timePartitionSlot, count, Integer::sum))); - return result; - } - public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index 58f2ba52b9c..1b87e42ad11 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -34,10 +34,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Objects; import java.util.TreeMap; import java.util.Vector; @@ -136,6 +134,17 @@ public class SeriesPartitionTable { return successorSlot == null ? null : seriesPartitionMap.get(successorSlot).get(0); } + /** + * Check and return the specified DataPartition's predecessor. + * + * @param timePartitionSlot Corresponding TimePartitionSlot + * @return The specified DataPartition's predecessor if exists, null otherwise + */ + public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot timePartitionSlot) { + TTimePartitionSlot predecessorSlot = seriesPartitionMap.lowerKey(timePartitionSlot); + return predecessorSlot == null ? null : seriesPartitionMap.get(predecessorSlot).get(0); + } + /** * Query a timePartition's corresponding dataRegionIds. * @@ -216,61 +225,6 @@ public class SeriesPartitionTable { return result; } - /** - * Get the max TimePartitionSlot of the specified Database. - * - * @return The max TimePartitionSlot, null if there are no DataPartitions yet - */ - public TTimePartitionSlot getMaxTimePartitionSlot() { - try { - return seriesPartitionMap.lastKey(); - } catch (NoSuchElementException e) { - return null; - } - } - - /** - * Get the min TimePartitionSlot of the specified Database. - * - * @return The min TimePartitionSlot, null if there are no DataPartitions yet - */ - public TTimePartitionSlot getMinTimePartitionSlot() { - try { - return seriesPartitionMap.firstKey(); - } catch (NoSuchElementException e) { - return null; - } - } - - /** - * Get the DataPartition with max TimePartition of the specified Database and the - * SeriesPartitionSlot. - * - * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartitionEntry, null if there are no DataPartitions - */ - public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot seriesPartitionSlot) { - Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry = - seriesPartitionMap.lastEntry(); - if (lastEntry == null) { - return null; - } - return new DataPartitionEntry( - seriesPartitionSlot, - lastEntry.getKey(), - lastEntry.getValue().get(lastEntry.getValue().size() - 1)); - } - - /** - * Check whether the specified DataPartition exists. - * - * @param timePartitionSlot Corresponding TimePartitionSlot - * @return 1 if exists, 0 otherwise - */ - public int isDataPartitionExist(TTimePartitionSlot timePartitionSlot) { - return seriesPartitionMap.containsKey(timePartitionSlot) ? 1 : 0; - } - /** * Get the last DataPartition's ConsensusGroupId. * @@ -285,18 +239,6 @@ public class SeriesPartitionTable { return lastEntry.getValue().get(lastEntry.getValue().size() - 1); } - /** - * Get the number of DataPartitions in each TimePartitionSlot. - * - * @return Map<TimePartitionSlot, the number of DataPartitions> - */ - public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() { - Map<TTimePartitionSlot, Integer> result = new HashMap<>(); - seriesPartitionMap.forEach( - (timePartitionSlot, consensusGroupIds) -> result.put(timePartitionSlot, 1)); - return result; - } - public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java index 1ea29c9a80b..09a49898635 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java @@ -75,6 +75,10 @@ public class BalanceTreeMap<K, V extends Comparable<V>> { return keyValueMap.containsKey(key); } + public int size() { + return keyValueMap.size(); + } + @TestOnly public void remove(K key) { V value = keyValueMap.getOrDefault(key, null);
