This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch add-per-db-awareness-to-region-allocator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7e4de77f6ce06a290b6f0f3a1ab3f56871a56dd3 Author: Yongzao <[email protected]> AuthorDate: Tue May 19 09:46:49 2026 +0800 Balance per-database replicas in PGP and Greedy region allocators PartiteGraphPlacementRegionGroupAllocator ignored the databaseAllocatedRegionGroups argument, so PGP balanced only the global per-DataNode region count. On clusters with multiple databases this let one DataNode hold many replicas of one database while holding few of another (e.g. 20 DataNodes / 2 databases / 60 RGs each could produce DN16 with 15 tod_sod0 replicas vs. 3 usr_sod0 replicas), which in turn prevented downstream leader balancing from reaching an even distribution. Make PGP and the Greedy fallback aware of the per-database load: - PGP now reads databaseAllocatedRegionGroups, tracks databaseRegionCounter[], and compares candidate sets with a (regionSum, databaseRegionSum, edgeSum) triple. Pre-sort inside the sub-graph uses (regionCount, databaseRegionCount, freeDiskSpace, random) so the fixed alpha slots also honour per-database balance. - GreedyRegionGroupAllocator adds databaseRegionCount to DataNodeEntry and sorts by (regionCount, databaseRegionCount, freeDiskSpace, random); buildWeightList consumes databaseAllocatedRegionGroups. The priority order (global > per-db > scatter) matches the user request. PGP's partite-graph structure still provides the high-scatter property by construction, so demoting edgeSum to the tertiary key does not regress scatter width. Tests: - New PartiteGraphPlacementRegionGroupAllocatorTest covers rf 2/3/5 multi-database scenarios, including the reported 20-DN/2-db regression. Each DataNode now holds exactly the expected per-db replica count. - GreedyRegionGroupAllocatorTest gets a new per-database balance test. - New IoTDBPerDatabaseRegionGroupAllocationIT exercises PGR, GCR, and GREEDY policies end-to-end on a real cluster. - CommonConfig (+ Mpp/Shared/Remote impls) gains setRegionGroupAllocatePolicy so ITs can switch between allocators. --- .../it/env/cluster/config/MppCommonConfig.java | 6 + .../env/cluster/config/MppSharedCommonConfig.java | 7 + .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + .../IoTDBPerDatabaseRegionGroupAllocationIT.java | 192 +++++++++++++++++ .../region/GreedyRegionGroupAllocator.java | 43 +++- .../PartiteGraphPlacementRegionGroupAllocator.java | 159 +++++++++++--- .../region/GreedyRegionGroupAllocatorTest.java | 76 +++++++ ...titeGraphPlacementRegionGroupAllocatorTest.java | 231 +++++++++++++++++++++ 9 files changed, 684 insertions(+), 37 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 1578e8a3fbc..4852b9d116e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -203,6 +203,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setRegionGroupAllocatePolicy(String regionGroupAllocatePolicy) { + setProperty("region_group_allocate_policy", regionGroupAllocatePolicy); + return this; + } + @Override public CommonConfig setSchemaRegionGroupExtensionPolicy(String schemaRegionGroupExtensionPolicy) { setProperty("schema_region_group_extension_policy", schemaRegionGroupExtensionPolicy); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index c7d8becb373..582c9a049e4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -202,6 +202,13 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setRegionGroupAllocatePolicy(String regionGroupAllocatePolicy) { + cnConfig.setRegionGroupAllocatePolicy(regionGroupAllocatePolicy); + dnConfig.setRegionGroupAllocatePolicy(regionGroupAllocatePolicy); + return this; + } + @Override public CommonConfig setSchemaRegionGroupExtensionPolicy(String schemaRegionGroupExtensionPolicy) { cnConfig.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 4e0b0d2d727..48c157e957b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -145,6 +145,11 @@ public class RemoteCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setRegionGroupAllocatePolicy(String regionGroupAllocatePolicy) { + return this; + } + @Override public CommonConfig setSchemaRegionGroupExtensionPolicy(String schemaRegionGroupExtensionPolicy) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index fb4c4f56ba5..dc21234e2ba 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -72,6 +72,8 @@ public interface CommonConfig { CommonConfig setIoTConsensusV2Mode(String iotConsensusV2Mode); + CommonConfig setRegionGroupAllocatePolicy(String regionGroupAllocatePolicy); + CommonConfig setSchemaRegionGroupExtensionPolicy(String schemaRegionGroupExtensionPolicy); CommonConfig setDefaultSchemaRegionGroupNumPerDatabase(int schemaRegionGroupPerDatabase); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBPerDatabaseRegionGroupAllocationIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBPerDatabaseRegionGroupAllocationIT.java new file mode 100644 index 00000000000..17e32dd7e02 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBPerDatabaseRegionGroupAllocationIT.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.load; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; +import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Verifies that DataRegion allocators distribute each database's replicas evenly across DataNodes. + * Regression for the case where {@code PartiteGraphPlacementRegionGroupAllocator} ignored {@code + * databaseAllocatedRegionGroups} and ended up with one DataNode holding many replicas of one + * database and few of another. + */ +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBPerDatabaseRegionGroupAllocationIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBPerDatabaseRegionGroupAllocationIT.class); + + private static final int TEST_DATA_NODE_NUM = 4; + private static final int TEST_REPLICATION_FACTOR = 2; + private static final int TEST_DATABASE_NUM = 2; + private static final int TEST_DATA_REGION_GROUP_NUM_PER_DATABASE = 4; + private static final String DATABASE_PREFIX = "root.db"; + private static final long TEST_TIME_PARTITION_INTERVAL = 604_800_000L; + + private void initCluster(String regionGroupAllocatePolicy) { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setRegionGroupAllocatePolicy(regionGroupAllocatePolicy) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setDataReplicationFactor(TEST_REPLICATION_FACTOR) + .setDataRegionGroupExtensionPolicy("CUSTOM") + .setDefaultDataRegionGroupNumPerDatabase(TEST_DATA_REGION_GROUP_NUM_PER_DATABASE) + // Avoid auto leader balancing rearranging anything during the test + .setEnableAutoLeaderBalanceForIoTConsensus(false); + EnvFactory.getEnv().initClusterEnvironment(1, TEST_DATA_NODE_NUM); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testPgrPolicyPerDbReplicaBalance() throws Exception { + initCluster("PGR"); + runPerDbBalanceCheck("PGR"); + } + + @Test + public void testGcrPolicyPerDbReplicaBalance() throws Exception { + initCluster("GCR"); + runPerDbBalanceCheck("GCR"); + } + + @Test + public void testGreedyPolicyPerDbReplicaBalance() throws Exception { + initCluster("GREEDY"); + runPerDbBalanceCheck("GREEDY"); + } + + private void runPerDbBalanceCheck(String policy) throws Exception { + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + // Create databases and trigger DataRegion materialization + for (int i = 0; i < TEST_DATABASE_NUM; i++) { + String currentDatabase = DATABASE_PREFIX + i; + TSStatus status = client.setDatabase(new TDatabaseSchema(currentDatabase)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + currentDatabase, + 0, + TEST_DATA_REGION_GROUP_NUM_PER_DATABASE, + 0, + 1, + TEST_TIME_PARTITION_INTERVAL); + TDataPartitionTableResp dataPartitionTableResp = + client.getOrCreateDataPartitionTable(new TDataPartitionReq(partitionSlotsMap)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + } + + // Collect DataRegion replicas grouped by database and DataNode + TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); + List<TRegionInfo> dataRegionInfoList = + showRegionResp.getRegionInfoList().stream() + .filter(r -> !r.database.startsWith(SystemConstant.SYSTEM_DATABASE)) + .filter(r -> !r.database.startsWith(SystemConstant.AUDIT_DATABASE)) + // Only consider DataRegion since SchemaRegion is allocated by a separate path + .filter(r -> r.getConsensusGroupId().getType() == TConsensusGroupType.DataRegion) + .collect(Collectors.toList()); + + Map<String, Map<Integer, Integer>> perDbReplicaCount = new TreeMap<>(); + Map<Integer, Integer> globalReplicaCount = new TreeMap<>(); + for (TRegionInfo info : dataRegionInfoList) { + perDbReplicaCount + .computeIfAbsent(info.getDatabase(), k -> new TreeMap<>()) + .merge(info.getDataNodeId(), 1, Integer::sum); + globalReplicaCount.merge(info.getDataNodeId(), 1, Integer::sum); + } + + int expectedPerDbPerDn = + TEST_DATA_REGION_GROUP_NUM_PER_DATABASE * TEST_REPLICATION_FACTOR / TEST_DATA_NODE_NUM; + int expectedGlobalPerDn = + TEST_DATABASE_NUM + * TEST_DATA_REGION_GROUP_NUM_PER_DATABASE + * TEST_REPLICATION_FACTOR + / TEST_DATA_NODE_NUM; + + for (int i = 0; i < TEST_DATABASE_NUM; i++) { + String currentDatabase = DATABASE_PREFIX + i; + Map<Integer, Integer> dnReplicaCount = perDbReplicaCount.get(currentDatabase); + Assert.assertNotNull("No DataRegion replicas found for " + currentDatabase, dnReplicaCount); + LOGGER.info("[{}] db {} replicas per DN: {}", policy, currentDatabase, dnReplicaCount); + Assert.assertEquals( + "policy=" + policy + " db=" + currentDatabase + " should cover all DataNodes", + TEST_DATA_NODE_NUM, + dnReplicaCount.size()); + for (Map.Entry<Integer, Integer> entry : dnReplicaCount.entrySet()) { + Assert.assertEquals( + "policy=" + policy + " db=" + currentDatabase + " dn=" + entry.getKey(), + expectedPerDbPerDn, + (int) entry.getValue()); + } + } + + LOGGER.info("[{}] global replicas per DN: {}", policy, globalReplicaCount); + Assert.assertEquals( + "policy=" + policy + " global allocation should cover all DataNodes", + TEST_DATA_NODE_NUM, + globalReplicaCount.size()); + for (Map.Entry<Integer, Integer> entry : globalReplicaCount.entrySet()) { + Assert.assertEquals( + "policy=" + policy + " global dn=" + entry.getKey(), + expectedGlobalPerDn, + (int) entry.getValue()); + } + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java index 33859233d90..4e940795418 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java @@ -45,12 +45,15 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { public int dataNodeId; public int regionCount; + public int databaseRegionCount; public double freeDiskSpace; public int randomWeight; - public DataNodeEntry(int dataNodeId, int regionCount, double freeDiskSpace) { + public DataNodeEntry( + int dataNodeId, int regionCount, int databaseRegionCount, double freeDiskSpace) { this.dataNodeId = dataNodeId; this.regionCount = regionCount; + this.databaseRegionCount = databaseRegionCount; this.freeDiskSpace = freeDiskSpace; this.randomWeight = RANDOM.nextInt(); } @@ -58,12 +61,15 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { @Override public int compareTo(DataNodeEntry other) { if (this.regionCount != other.regionCount) { - return this.regionCount - other.regionCount; - } else if (this.freeDiskSpace != other.freeDiskSpace) { - return (int) (other.freeDiskSpace - this.freeDiskSpace); - } else { - return this.randomWeight - other.randomWeight; + return Integer.compare(this.regionCount, other.regionCount); } + if (this.databaseRegionCount != other.databaseRegionCount) { + return Integer.compare(this.databaseRegionCount, other.databaseRegionCount); + } + if (this.freeDiskSpace != other.freeDiskSpace) { + return Double.compare(other.freeDiskSpace, this.freeDiskSpace); + } + return Integer.compare(this.randomWeight, other.randomWeight); } } @@ -75,9 +81,13 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { List<TRegionReplicaSet> databaseAllocatedRegionGroups, int replicationFactor, TConsensusGroupId consensusGroupId) { - // Build weightList order by number of regions allocated asc + // Build weightList ordered by (regionCount asc, databaseRegionCount asc, freeDiskSpace desc) List<TDataNodeLocation> weightList = - buildWeightList(availableDataNodeMap, freeDiskSpaceMap, allocatedRegionGroups); + buildWeightList( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups); return new TRegionReplicaSet( consensusGroupId, weightList.stream().limit(replicationFactor).collect(Collectors.toList())); @@ -99,7 +109,8 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { private List<TDataNodeLocation> buildWeightList( Map<Integer, TDataNodeConfiguration> availableDataNodeMap, Map<Integer, Double> freeDiskSpaceMap, - List<TRegionReplicaSet> allocatedRegionGroups) { + List<TRegionReplicaSet> allocatedRegionGroups, + List<TRegionReplicaSet> databaseAllocatedRegionGroups) { // Map<DataNodeId, Region count> Map<Integer, Integer> regionCounter = new HashMap<>(availableDataNodeMap.size()); @@ -111,6 +122,19 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { dataNodeLocation -> regionCounter.merge(dataNodeLocation.getDataNodeId(), 1, Integer::sum))); + // Map<DataNodeId, Region count within the same Database> + Map<Integer, Integer> databaseRegionCounter = new HashMap<>(availableDataNodeMap.size()); + if (databaseAllocatedRegionGroups != null) { + databaseAllocatedRegionGroups.forEach( + regionReplicaSet -> + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + databaseRegionCounter.merge( + dataNodeLocation.getDataNodeId(), 1, Integer::sum))); + } + /* Construct priority map */ List<DataNodeEntry> entryList = new ArrayList<>(); availableDataNodeMap.forEach( @@ -119,6 +143,7 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { new DataNodeEntry( datanodeId, regionCounter.getOrDefault(datanodeId, 0), + databaseRegionCounter.getOrDefault(datanodeId, 0), freeDiskSpaceMap.getOrDefault(datanodeId, 0d)))); // Sort weightList diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java index f7be6554776..d036c6eda0a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java @@ -26,10 +26,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.i18n.ManagerMessages; -import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator.DataNodeEntry; - -import org.apache.tsfile.utils.Pair; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -40,6 +38,7 @@ import java.util.stream.Collectors; public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAllocator { + private static final SecureRandom RANDOM = new SecureRandom(); private static final GreedyRegionGroupAllocator GREEDY_ALLOCATOR = new GreedyRegionGroupAllocator(); @@ -48,15 +47,17 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl private int regionPerDataNode; private int dataNodeNum; - // The number of allocated Regions in each DataNode + // The number of allocated Regions in each DataNode (fake-id indexed) private int[] regionCounter; - // The number of edges in current cluster + // The number of allocated Regions in each DataNode within the same Database (fake-id indexed) + private int[] databaseRegionCounter; + // Whether there exists a region with both i and j as replicas (fake-id indexed, binary) private int[][] combinationCounter; private Map<Integer, Integer> fakeToRealIdMap; private int alphaDataNodeNum; - // Pair<combinationSum, RegionSum> - Pair<Integer, Integer> bestValue; + // The best valuation found so far + private Value bestValue; private int[] bestAlphaNodes; @Override @@ -71,13 +72,17 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl consensusGroupId.getType().equals(TConsensusGroupType.DataRegion) ? ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode() : ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode(); - prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups); + prepare( + replicationFactor, + availableDataNodeMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups); // Select alpha nodes set for (int i = 0; i < subGraphCount; i++) { subGraphSearch(i, freeDiskSpaceMap); } - if (bestValue.left == Integer.MAX_VALUE) { + if (bestValue.regionSum == Integer.MAX_VALUE) { // Use greedy allocator as alternative if no alpha nodes set is found return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution( availableDataNodeMap, @@ -132,7 +137,8 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl private void prepare( int replicationFactor, Map<Integer, TDataNodeConfiguration> availableDataNodeMap, - List<TRegionReplicaSet> allocatedRegionGroups) { + List<TRegionReplicaSet> allocatedRegionGroups, + List<TRegionReplicaSet> databaseAllocatedRegionGroups) { this.subGraphCount = replicationFactor / 2 + (replicationFactor % 2 == 0 ? 0 : 1); this.replicationFactor = replicationFactor; @@ -150,9 +156,11 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl realToFakeIdMap.put(dataNodeIdList.get(i), i); } - // Compute regionCounter and combinationCounter + // Compute regionCounter, databaseRegionCounter and combinationCounter this.regionCounter = new int[dataNodeNum]; Arrays.fill(regionCounter, 0); + this.databaseRegionCounter = new int[dataNodeNum]; + Arrays.fill(databaseRegionCounter, 0); this.combinationCounter = new int[dataNodeNum][dataNodeNum]; for (int i = 0; i < dataNodeNum; i++) { Arrays.fill(combinationCounter[i], 0); @@ -160,42 +168,64 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) { List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations(); for (int i = 0; i < dataNodeLocations.size(); i++) { - int fakeIId = realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId()); + Integer fakeIId = realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId()); + if (fakeIId == null) { + // Skip nodes that are no longer available + continue; + } regionCounter[fakeIId]++; for (int j = i + 1; j < dataNodeLocations.size(); j++) { - int fakeJId = realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId()); + Integer fakeJId = realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId()); + if (fakeJId == null) { + continue; + } combinationCounter[fakeIId][fakeJId] = 1; combinationCounter[fakeJId][fakeIId] = 1; } } } + if (databaseAllocatedRegionGroups != null) { + for (TRegionReplicaSet regionReplicaSet : databaseAllocatedRegionGroups) { + for (TDataNodeLocation location : regionReplicaSet.getDataNodeLocations()) { + Integer fakeId = realToFakeIdMap.get(location.getDataNodeId()); + if (fakeId != null) { + databaseRegionCounter[fakeId]++; + } + } + } + } // Reset the optimal result this.alphaDataNodeNum = replicationFactor / 2 + 1; - this.bestValue = new Pair<>(Integer.MAX_VALUE, Integer.MAX_VALUE); + this.bestValue = Value.worst(); this.bestAlphaNodes = new int[alphaDataNodeNum]; } - private Pair<Integer, Integer> valuation(int[] nodes) { - int edgeSum = 0; + private Value valuation(int[] nodes) { int regionSum = 0; + int databaseRegionSum = 0; + int edgeSum = 0; for (int iota : nodes) { + regionSum += regionCounter[iota]; + databaseRegionSum += databaseRegionCounter[iota]; for (int kappa : nodes) { edgeSum += combinationCounter[iota][kappa]; } - regionSum += regionCounter[iota]; } - return new Pair<>(edgeSum, regionSum); + return new Value(regionSum, databaseRegionSum, edgeSum); } private void subGraphSearch(int firstIndex, Map<Integer, Double> freeDiskSpaceMap) { - List<DataNodeEntry> entryList = new ArrayList<>(); + List<PgpDataNodeEntry> entryList = new ArrayList<>(); for (int index = firstIndex; index < dataNodeNum; index += subGraphCount) { // Prune: skip filled DataNodes if (regionCounter[index] < regionPerDataNode) { entryList.add( - new DataNodeEntry( - index, regionCounter[index], freeDiskSpaceMap.get(fakeToRealIdMap.get(index)))); + new PgpDataNodeEntry( + index, + regionCounter[index], + databaseRegionCounter[index], + freeDiskSpaceMap.get(fakeToRealIdMap.get(index)))); } } if (entryList.size() < alphaDataNodeNum) { @@ -209,9 +239,8 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl } for (int i = alphaDataNodeNum - 1; i < entryList.size(); i++) { alphaNodes[alphaDataNodeNum - 1] = entryList.get(i).dataNodeId; - Pair<Integer, Integer> currentValue = valuation(alphaNodes); - if (currentValue.left < bestValue.left - || (currentValue.left.equals(bestValue.left) && currentValue.right < bestValue.right)) { + Value currentValue = valuation(alphaNodes); + if (currentValue.compareTo(bestValue) < 0) { bestValue = currentValue; System.arraycopy(alphaNodes, 0, bestAlphaNodes, 0, alphaDataNodeNum); } @@ -228,16 +257,15 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl continue; } int selectedDataNode = -1; - Pair<Integer, Integer> tmpValue = new Pair<>(Integer.MAX_VALUE, Integer.MAX_VALUE); + Value tmpValue = Value.worst(); for (int i = partiteIndex; i < dataNodeNum; i += subGraphCount) { if (regionCounter[i] >= regionPerDataNode) { // Pruning: skip filled DataNodes continue; } tmpNodes[alphaDataNodeNum] = i; - Pair<Integer, Integer> currentValue = valuation(tmpNodes); - if (currentValue.left < tmpValue.left - || (currentValue.left.equals(tmpValue.left) && currentValue.right < tmpValue.right)) { + Value currentValue = valuation(tmpNodes); + if (currentValue.compareTo(tmpValue) < 0) { tmpValue = currentValue; selectedDataNode = i; } @@ -249,4 +277,79 @@ public class PartiteGraphPlacementRegionGroupAllocator implements IRegionGroupAl } return betaNodes; } + + /** + * Valuation for a candidate alpha (or alpha ∪ beta-candidate) set. Smaller is better. Comparison + * priority: + * + * <ol> + * <li>{@code regionSum} — global per-DN region count (balance the whole cluster) + * <li>{@code databaseRegionSum} — per-(database, DN) region count (balance each database) + * <li>{@code edgeSum} — 2-Region combination scatter (refines PGP's scatter property) + * </ol> + */ + private static class Value implements Comparable<Value> { + + private final int regionSum; + private final int databaseRegionSum; + private final int edgeSum; + + private Value(int regionSum, int databaseRegionSum, int edgeSum) { + this.regionSum = regionSum; + this.databaseRegionSum = databaseRegionSum; + this.edgeSum = edgeSum; + } + + private static Value worst() { + return new Value(Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); + } + + @Override + public int compareTo(Value other) { + if (regionSum != other.regionSum) { + return Integer.compare(regionSum, other.regionSum); + } + if (databaseRegionSum != other.databaseRegionSum) { + return Integer.compare(databaseRegionSum, other.databaseRegionSum); + } + return Integer.compare(edgeSum, other.edgeSum); + } + } + + /** + * Pre-sort entry for selecting alpha nodes inside a sub-graph. Sort priority matches {@link + * Value}: regionCount first, databaseRegionCount second, then freeDiskSpace (descending) and a + * random tie-breaker. + */ + private static class PgpDataNodeEntry implements Comparable<PgpDataNodeEntry> { + + private final int dataNodeId; + private final int regionCount; + private final int databaseRegionCount; + private final double freeDiskSpace; + private final int randomWeight; + + private PgpDataNodeEntry( + int dataNodeId, int regionCount, int databaseRegionCount, double freeDiskSpace) { + this.dataNodeId = dataNodeId; + this.regionCount = regionCount; + this.databaseRegionCount = databaseRegionCount; + this.freeDiskSpace = freeDiskSpace; + this.randomWeight = RANDOM.nextInt(); + } + + @Override + public int compareTo(PgpDataNodeEntry other) { + if (this.regionCount != other.regionCount) { + return Integer.compare(this.regionCount, other.regionCount); + } + if (this.databaseRegionCount != other.databaseRegionCount) { + return Integer.compare(this.databaseRegionCount, other.databaseRegionCount); + } + if (this.freeDiskSpace != other.freeDiskSpace) { + return Double.compare(other.freeDiskSpace, this.freeDiskSpace); + } + return Integer.compare(this.randomWeight, other.randomWeight); + } + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java index b0dd6769f2e..55111efa09f 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java @@ -29,11 +29,13 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -42,6 +44,80 @@ public class GreedyRegionGroupAllocatorTest { private static final GreedyRegionGroupAllocator ALLOCATOR = new GreedyRegionGroupAllocator(); private static final int TEST_REPLICATION_FACTOR = 3; + /** + * Multi-database regression: each database's replicas should be evenly distributed across all + * DataNodes. With 2 databases × 30 RGs × rf 3 on 10 DNs, per-(db, DN) replica count must be + * exactly 9 (60 RGs × 3 / 10 / 2 = 9). + */ + @Test + public void testPerDatabaseBalance() { + int dataNodeNum = 10; + int databaseNum = 2; + int regionGroupsPerDatabase = 30; + int rf = 3; + + Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new ConcurrentHashMap<>(); + Map<Integer, Double> freeSpaceMap = new ConcurrentHashMap<>(); + Random random = new Random(0); + for (int i = 1; i <= dataNodeNum; i++) { + availableDataNodeMap.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + freeSpaceMap.put(i, random.nextDouble()); + } + + List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>(); + Map<Integer, List<TRegionReplicaSet>> databaseAllocatedRegionGroups = new TreeMap<>(); + for (int db = 0; db < databaseNum; db++) { + databaseAllocatedRegionGroups.put(db, new ArrayList<>()); + } + + int regionId = 0; + for (int round = 0; round < regionGroupsPerDatabase; round++) { + for (int db = 0; db < databaseNum; db++) { + TRegionReplicaSet newRegionGroup = + ALLOCATOR.generateOptimalRegionReplicasDistribution( + availableDataNodeMap, + freeSpaceMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups.get(db), + rf, + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId++)); + allocatedRegionGroups.add(newRegionGroup); + databaseAllocatedRegionGroups.get(db).add(newRegionGroup); + } + } + + int expectedPerDb = regionGroupsPerDatabase * rf / dataNodeNum; + for (int db = 0; db < databaseNum; db++) { + Map<Integer, Integer> perDnReplicaCount = new HashMap<>(); + for (TRegionReplicaSet rg : databaseAllocatedRegionGroups.get(db)) { + for (TDataNodeLocation loc : rg.getDataNodeLocations()) { + perDnReplicaCount.merge(loc.getDataNodeId(), 1, Integer::sum); + } + } + for (int dnId = 1; dnId <= dataNodeNum; dnId++) { + Assert.assertEquals( + "db " + db + " dn " + dnId + " replica count", + expectedPerDb, + perDnReplicaCount.getOrDefault(dnId, 0).intValue()); + } + } + + Map<Integer, Integer> globalCount = new HashMap<>(); + for (TRegionReplicaSet rg : allocatedRegionGroups) { + for (TDataNodeLocation loc : rg.getDataNodeLocations()) { + globalCount.merge(loc.getDataNodeId(), 1, Integer::sum); + } + } + int expectedGlobal = databaseNum * regionGroupsPerDatabase * rf / dataNodeNum; + for (int dnId = 1; dnId <= dataNodeNum; dnId++) { + Assert.assertEquals( + "dn " + dnId + " global replica count", + expectedGlobal, + globalCount.getOrDefault(dnId, 0).intValue()); + } + } + @Test public void testEvenDistribution() { /* Construct input data */ diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocatorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocatorTest.java new file mode 100644 index 00000000000..df086659d7f --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocatorTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +public class PartiteGraphPlacementRegionGroupAllocatorTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PartiteGraphPlacementRegionGroupAllocatorTest.class); + + private static final PartiteGraphPlacementRegionGroupAllocator ALLOCATOR = + new PartiteGraphPlacementRegionGroupAllocator(); + + private static final int TEST_DATA_NODE_NUM = 20; + private static final Map<Integer, TDataNodeConfiguration> AVAILABLE_DATA_NODE_MAP = + new HashMap<>(); + private static final Map<Integer, Double> FREE_SPACE_MAP = new HashMap<>(); + + @BeforeClass + public static void setUp() { + Random random = new Random(0); + for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) { + AVAILABLE_DATA_NODE_MAP.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + FREE_SPACE_MAP.put(i, random.nextDouble()); + } + } + + /** + * Regression for the user's reported scenario: 20 DataNodes, 2 databases, 60 DataRegions per + * database, replication factor 3. + * + * <p>With the legacy PGP (per-db blind) implementation, the per-(database, DataNode) replica + * count could swing as far as {2, ..., 15}: some DataNodes ended up holding 15 replicas of one + * database while only 3 of the other. The per-db-aware implementation must distribute each + * database's replicas evenly across all DataNodes. + */ + @Test + public void testTwoDatabasesPerDbBalance() { + final int replicationFactor = 3; + final int regionGroupsPerDatabase = 60; + final int databaseNum = 2; + + List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>(); + Map<Integer, List<TRegionReplicaSet>> databaseAllocatedRegionGroups = new TreeMap<>(); + for (int db = 0; db < databaseNum; db++) { + databaseAllocatedRegionGroups.put(db, new ArrayList<>()); + } + + // Interleave region group creation between databases the same way IoTDB does in practice + int regionId = 0; + for (int round = 0; round < regionGroupsPerDatabase; round++) { + for (int db = 0; db < databaseNum; db++) { + TRegionReplicaSet newRegionGroup = + ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocatedRegionGroups, + databaseAllocatedRegionGroups.get(db), + replicationFactor, + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId++)); + allocatedRegionGroups.add(newRegionGroup); + databaseAllocatedRegionGroups.get(db).add(newRegionGroup); + } + } + + // Per-(db, DN) replica count must be tightly balanced + for (int db = 0; db < databaseNum; db++) { + Map<Integer, Integer> perDnReplicaCount = + countReplicasPerDataNode(databaseAllocatedRegionGroups.get(db)); + int minCount = Integer.MAX_VALUE; + int maxCount = Integer.MIN_VALUE; + for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) { + int c = perDnReplicaCount.getOrDefault(dnId, 0); + minCount = Math.min(minCount, c); + maxCount = Math.max(maxCount, c); + } + LOGGER.info("db {}: per-DN replica min={}, max={}", db, minCount, maxCount); + // Expected ideal: each DN holds (regionGroupsPerDatabase * replicationFactor) / + // TEST_DATA_NODE_NUM = 9 replicas of each db. Max - min should not exceed 1. + Assert.assertTrue( + "Per-db replica spread too wide: max=" + maxCount + ", min=" + minCount, + maxCount - minCount <= 1); + } + + // Global per-DN replica count must also be tightly balanced + Map<Integer, Integer> globalCount = countReplicasPerDataNode(allocatedRegionGroups); + int globalMin = Integer.MAX_VALUE; + int globalMax = Integer.MIN_VALUE; + for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) { + int c = globalCount.getOrDefault(dnId, 0); + globalMin = Math.min(globalMin, c); + globalMax = Math.max(globalMax, c); + } + LOGGER.info("global per-DN replica min={}, max={}", globalMin, globalMax); + Assert.assertTrue( + "Global replica spread too wide: max=" + globalMax + ", min=" + globalMin, + globalMax - globalMin <= 1); + } + + /** Multi-database (4 dbs) interleaved allocation with replication factor 3. */ + @Test + public void testMultiDatabasePerDbBalance() { + // 40 RGs per db × rf 3 = 120 replicas per db → 6 per DN (integer avg) + runMultiDatabaseTest(4, 3, 40); + } + + /** Multi-database (3 dbs) interleaved allocation with replication factor 2. */ + @Test + public void testTwoFactorMultiDatabaseBalance() { + // 40 RGs per db × rf 2 = 80 replicas per db → 4 per DN (integer avg) + runMultiDatabaseTest(3, 2, 40); + } + + /** Multi-database (3 dbs) interleaved allocation with replication factor 5. */ + @Test + public void testFiveFactorMultiDatabaseBalance() { + // 20 RGs per db × rf 5 = 100 replicas per db → 5 per DN (integer avg) + runMultiDatabaseTest(3, 5, 20); + } + + private void runMultiDatabaseTest( + int databaseNum, int replicationFactor, int regionGroupsPerDatabase) { + List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>(); + Map<Integer, List<TRegionReplicaSet>> databaseAllocatedRegionGroups = new TreeMap<>(); + for (int db = 0; db < databaseNum; db++) { + databaseAllocatedRegionGroups.put(db, new ArrayList<>()); + } + + int regionId = 0; + for (int round = 0; round < regionGroupsPerDatabase; round++) { + for (int db = 0; db < databaseNum; db++) { + TRegionReplicaSet newRegionGroup = + ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocatedRegionGroups, + databaseAllocatedRegionGroups.get(db), + replicationFactor, + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId++)); + allocatedRegionGroups.add(newRegionGroup); + databaseAllocatedRegionGroups.get(db).add(newRegionGroup); + } + } + + for (int db = 0; db < databaseNum; db++) { + Map<Integer, Integer> perDnReplicaCount = + countReplicasPerDataNode(databaseAllocatedRegionGroups.get(db)); + int minCount = Integer.MAX_VALUE; + int maxCount = Integer.MIN_VALUE; + for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) { + int c = perDnReplicaCount.getOrDefault(dnId, 0); + minCount = Math.min(minCount, c); + maxCount = Math.max(maxCount, c); + } + LOGGER.info( + "rf={}, dbs={}, db {}: per-DN replica min={}, max={}", + replicationFactor, + databaseNum, + db, + minCount, + maxCount); + Assert.assertTrue( + "Per-db replica spread too wide for db " + db + ": max=" + maxCount + ", min=" + minCount, + maxCount - minCount <= 1); + } + + Map<Integer, Integer> globalCount = countReplicasPerDataNode(allocatedRegionGroups); + int globalMin = Integer.MAX_VALUE; + int globalMax = Integer.MIN_VALUE; + for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) { + int c = globalCount.getOrDefault(dnId, 0); + globalMin = Math.min(globalMin, c); + globalMax = Math.max(globalMax, c); + } + LOGGER.info( + "rf={}, dbs={}, global per-DN replica min={}, max={}", + replicationFactor, + databaseNum, + globalMin, + globalMax); + Assert.assertTrue( + "Global replica spread too wide: max=" + globalMax + ", min=" + globalMin, + globalMax - globalMin <= 1); + } + + private Map<Integer, Integer> countReplicasPerDataNode(List<TRegionReplicaSet> regionGroups) { + Map<Integer, Integer> counter = new HashMap<>(); + for (TRegionReplicaSet rg : regionGroups) { + for (TDataNodeLocation loc : rg.getDataNodeLocations()) { + counter.merge(loc.getDataNodeId(), 1, Integer::sum); + } + } + return counter; + } +}
