This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2865e8a4233 [IOTDB-6308] CFD algorithm for multi-database (#12184)
2865e8a4233 is described below
commit 2865e8a4233246f0570d8a19be88f824c88c4645
Author: Yongzao <[email protected]>
AuthorDate: Sun Mar 31 10:27:52 2024 +0800
[IOTDB-6308] CFD algorithm for multi-database (#12184)
---
...oTDBMultiDBRegionGroupLeaderDistributionIT.java | 154 +++++++++++++++
...a => IoTDBRegionGroupLeaderDistributionIT.java} | 123 ++++++------
.../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +-
.../confignode/conf/ConfigNodeDescriptor.java | 5 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 2 +-
.../manager/load/balancer/RouteBalancer.java | 3 +-
.../router/leader/GreedyLeaderBalancer.java | 129 ++-----------
.../balancer/router/leader/ILeaderBalancer.java | 6 +-
.../router/leader/MinCostFlowLeaderBalancer.java | 211 +++++++++++++--------
.../persistence/partition/PartitionInfo.java | 7 +-
...alancerTest.java => CFDLeaderBalancerTest.java} | 66 ++++---
.../router/leader/GreedyLeaderBalancerTest.java | 6 +-
.../leader/LeaderBalancerComparisonTest.java | 4 +-
.../resources/conf/iotdb-common.properties | 9 -
14 files changed, 422 insertions(+), 305 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBMultiDBRegionGroupLeaderDistributionIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBMultiDBRegionGroupLeaderDistributionIT.java
new file mode 100644
index 00000000000..381c2a6fff5
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBMultiDBRegionGroupLeaderDistributionIT.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.load;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBMultiDBRegionGroupLeaderDistributionIT {
+
+ private static final int TEST_DATA_NODE_NUM = 3;
+ private static final int TEST_REPLICATION_FACTOR = 2;
+ private static final String TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
+ ConsensusFactory.IOT_CONSENSUS;
+
+ private static final String DATABASE = "root.db";
+ private static final int TEST_DATABASE_NUM = 2;
+ private static final int TEST_MIN_DATA_REGION_GROUP_NUM = 4;
+ private static final long TEST_TIME_PARTITION_INTERVAL = 604800000;
+
+ @Before
+ public void setUp() {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setEnableAutoLeaderBalanceForIoTConsensus(true)
+ .setDataReplicationFactor(TEST_REPLICATION_FACTOR)
+
.setDataRegionConsensusProtocolClass(TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
+ EnvFactory.getEnv().initClusterEnvironment(1, TEST_DATA_NODE_NUM);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testMultiDatabaseLeaderDistribution()
+ throws ClientManagerException, IOException, InterruptedException,
TException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ for (int i = 0; i < TEST_DATABASE_NUM; i++) {
+ String curDb = DATABASE + i;
+ TSStatus status =
+ client.setDatabase(
+ new TDatabaseSchema(curDb)
+ .setMinDataRegionGroupNum(TEST_MIN_DATA_REGION_GROUP_NUM));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ // Insert DataPartitions to create DataRegionGroups
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ curDb, 0, 10, 0, 10, TEST_TIME_PARTITION_INTERVAL);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(new
TDataPartitionReq(partitionSlotsMap));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ }
+
+ final int retryNum = 50;
+ for (int retry = 0; retry < retryNum; retry++) {
+ // Check the number of RegionGroup-leader in each DataNode.
+ Map<Integer, Integer> dataNodeLeaderCounter = new TreeMap<>();
+ Map<String, Map<Integer, Integer>> databaseLeaderCounter = new
TreeMap<>();
+ TShowRegionResp showRegionResp = client.showRegion(new
TShowRegionReq());
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ dataNodeLeaderCounter.merge(regionInfo.getDataNodeId(), 1,
Integer::sum);
+ databaseLeaderCounter
+ .computeIfAbsent(regionInfo.getDatabase(), k -> new
TreeMap<>())
+ .merge(regionInfo.getDataNodeId(), 1, Integer::sum);
+ }
+ });
+ AtomicBoolean pass = new AtomicBoolean(true);
+ // The number of Region-leader in each DataNode should be equal
+ if (dataNodeLeaderCounter.size() != TEST_DATA_NODE_NUM) {
+ pass.set(false);
+ }
+ if
(dataNodeLeaderCounter.values().stream().max(Integer::compareTo).orElse(0)
+ -
dataNodeLeaderCounter.values().stream().min(Integer::compareTo).orElse(0)
+ > 1) {
+ pass.set(false);
+ }
+ // The number of Region-leader in each DataNode within the same
Database should be equal
+ if (TEST_DATABASE_NUM != databaseLeaderCounter.size()) {
+ pass.set(false);
+ }
+ databaseLeaderCounter.forEach(
+ (database, leaderCounter) -> {
+ if
(leaderCounter.values().stream().max(Integer::compareTo).orElse(0)
+ -
leaderCounter.values().stream().min(Integer::compareTo).orElse(0)
+ > 1) {
+ pass.set(false);
+ }
+ });
+ if (pass.get()) {
+ return;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ Assert.fail("The leader distribution is not balanced after " + retryNum
+ " retries.");
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT.java
similarity index 74%
rename from
integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT.java
index d7b8ca0e6db..83dc1e2c58f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.it.load;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -48,21 +49,22 @@ import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBClusterRegionLeaderBalancingIT {
- private static final String testSchemaRegionConsensusProtocolClass =
+public class IoTDBRegionGroupLeaderDistributionIT {
+ private static final String TEST_SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
ConsensusFactory.RATIS_CONSENSUS;
- private static final String testDataRegionConsensusProtocolClass =
ConsensusFactory.IOT_CONSENSUS;
- private static final int testReplicationFactor = 3;
+ private static final String TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
+ ConsensusFactory.IOT_CONSENSUS;
+ private static final int TEST_REPLICATION_FACTOR = 3;
- private static final String sg = "root.sg";
- private final int testDataNodeNum = 3;
+ private static final String DATABASE = "root.db";
+ private static final int TEST_DATA_NODE_NUM = 3;
@Before
public void setUp() {
@@ -71,11 +73,11 @@ public class IoTDBClusterRegionLeaderBalancingIT {
.getCommonConfig()
.setEnableAutoLeaderBalanceForRatisConsensus(true)
.setEnableAutoLeaderBalanceForIoTConsensus(true)
-
.setSchemaRegionConsensusProtocolClass(testSchemaRegionConsensusProtocolClass)
-
.setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass)
- .setSchemaReplicationFactor(testReplicationFactor)
- .setDataReplicationFactor(testReplicationFactor);
- EnvFactory.getEnv().initClusterEnvironment(1, 3);
+
.setSchemaRegionConsensusProtocolClass(TEST_SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS)
+
.setDataRegionConsensusProtocolClass(TEST_DATA_REGION_CONSENSUS_PROTOCOL_CLASS)
+ .setSchemaReplicationFactor(TEST_REPLICATION_FACTOR)
+ .setDataReplicationFactor(TEST_REPLICATION_FACTOR);
+ EnvFactory.getEnv().initClusterEnvironment(1, TEST_DATA_NODE_NUM);
}
@After
@@ -84,28 +86,26 @@ public class IoTDBClusterRegionLeaderBalancingIT {
}
@Test
- public void testGreedyLeaderDistribution() throws Exception {
-
+ public void testBasicLeaderDistribution() throws Exception {
TSStatus status;
- final int storageGroupNum = 3;
+ final int databaseNum = 3;
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-
- // Set StorageGroups
- for (int i = 0; i < storageGroupNum; i++) {
- status = client.setDatabase(new TDatabaseSchema(sg + i));
+ // Set Databases
+ for (int i = 0; i < databaseNum; i++) {
+ status = client.setDatabase(new TDatabaseSchema(DATABASE + i));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
}
- // Create a DataRegionGroup for each StorageGroup through
getOrCreateDataPartition
- for (int i = 0; i < storageGroupNum; i++) {
+ // Create a DataRegionGroup for each Database through
getOrCreateDataPartition
+ for (int i = 0; i < databaseNum; i++) {
Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new
HashMap<>();
seriesSlotMap.put(
new TSeriesPartitionSlot(1),
new TTimeSlotList()
.setTimePartitionSlots(Collections.singletonList(new
TTimePartitionSlot(100))));
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new
HashMap<>();
- sgSlotsMap.put(sg + i, seriesSlotMap);
+ sgSlotsMap.put(DATABASE + i, seriesSlotMap);
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(new
TDataPartitionReq(sgSlotsMap));
Assert.assertEquals(
@@ -113,47 +113,43 @@ public class IoTDBClusterRegionLeaderBalancingIT {
dataPartitionTableResp.getStatus().getCode());
}
- // Check the number of Region-leader in each DataNode.
- Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ // Check the number of RegionGroup-leader in each DataNode.
+ Map<Integer, Integer> leaderCounter = new TreeMap<>();
TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
showRegionResp
.getRegionInfoList()
.forEach(
regionInfo -> {
if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
- leaderCounter
- .computeIfAbsent(regionInfo.getDataNodeId(), empty ->
new AtomicInteger(0))
- .getAndIncrement();
+ leaderCounter.merge(regionInfo.getDataNodeId(), 1,
Integer::sum);
}
});
// The number of Region-leader in each DataNode should be exactly 1
- Assert.assertEquals(testDataNodeNum, leaderCounter.size());
- leaderCounter.values().forEach(leaderCount -> Assert.assertEquals(1,
leaderCount.get()));
+ Assert.assertEquals(TEST_DATA_NODE_NUM, leaderCounter.size());
+ leaderCounter.forEach((dataNodeId, leaderCount) ->
Assert.assertEquals(1, (int) leaderCount));
}
}
@Test
- public void testMCFLeaderDistributionWithUnknownStatus() throws Exception {
+ public void testCFDWithUnknownStatus() throws Exception {
final int retryNum = 50;
-
TSStatus status;
- final int storageGroupNum = 6;
+ final int databaseNum = 6;
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-
- for (int i = 0; i < storageGroupNum; i++) {
- // Set StorageGroups
- status = client.setDatabase(new TDatabaseSchema(sg + i));
+ for (int i = 0; i < databaseNum; i++) {
+ // Set Databases
+ status = client.setDatabase(new TDatabaseSchema(DATABASE + i));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- // Create a DataRegionGroup for each StorageGroup
+ // Create a DataRegionGroup for each Database
Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new
HashMap<>();
seriesSlotMap.put(
new TSeriesPartitionSlot(1),
new TTimeSlotList()
.setTimePartitionSlots(Collections.singletonList(new
TTimePartitionSlot(100))));
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new
HashMap<>();
- sgSlotsMap.put(sg + i, seriesSlotMap);
+ sgSlotsMap.put(DATABASE + i, seriesSlotMap);
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(new
TDataPartitionReq(sgSlotsMap));
Assert.assertEquals(
@@ -162,7 +158,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
}
// Check leader distribution
- Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ Map<Integer, Integer> leaderCounter = new TreeMap<>();
TShowRegionResp showRegionResp;
boolean isDistributionBalanced = false;
for (int retry = 0; retry < retryNum; retry++) {
@@ -173,18 +169,17 @@ public class IoTDBClusterRegionLeaderBalancingIT {
.forEach(
regionInfo -> {
if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
- leaderCounter
- .computeIfAbsent(regionInfo.getDataNodeId(), empty ->
new AtomicInteger(0))
- .getAndIncrement();
+ leaderCounter.merge(regionInfo.getDataNodeId(), 1,
Integer::sum);
}
});
// All DataNodes have Region-leader
- isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
+ isDistributionBalanced = leaderCounter.size() == TEST_DATA_NODE_NUM;
// Each DataNode has exactly 2 Region-leader
- for (AtomicInteger leaderCount : leaderCounter.values()) {
- if (leaderCount.get() != 2) {
+ for (Integer leaderCount : leaderCounter.values()) {
+ if (leaderCount != databaseNum / TEST_DATA_NODE_NUM) {
isDistributionBalanced = false;
+ break;
}
}
@@ -213,18 +208,17 @@ public class IoTDBClusterRegionLeaderBalancingIT {
.forEach(
regionInfo -> {
if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
- leaderCounter
- .computeIfAbsent(regionInfo.getDataNodeId(), empty ->
new AtomicInteger(0))
- .getAndIncrement();
+ leaderCounter.merge(regionInfo.getDataNodeId(), 1,
Integer::sum);
}
});
// Only Running DataNodes have Region-leader
- isDistributionBalanced = leaderCounter.size() == testDataNodeNum - 1;
+ isDistributionBalanced = leaderCounter.size() == TEST_DATA_NODE_NUM -
1;
// Each Running DataNode has exactly 3 Region-leader
- for (AtomicInteger leaderCount : leaderCounter.values()) {
- if (leaderCount.get() != 3) {
+ for (Integer leaderCount : leaderCounter.values()) {
+ if (leaderCount != databaseNum / (TEST_DATA_NODE_NUM - 1)) {
isDistributionBalanced = false;
+ break;
}
}
@@ -239,27 +233,27 @@ public class IoTDBClusterRegionLeaderBalancingIT {
}
@Test
- public void testMCFLeaderDistributionWithReadOnlyStatus() throws Exception {
+ public void testCFDWithReadOnlyStatus() throws Exception {
final int retryNum = 50;
TSStatus status;
- final int storageGroupNum = 3;
+ final int databaseNum = 3;
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- for (int i = 0; i < storageGroupNum; i++) {
- // Set StorageGroups
- status = client.setDatabase(new TDatabaseSchema(sg + i));
+ for (int i = 0; i < databaseNum; i++) {
+ // Set Databases
+ status = client.setDatabase(new TDatabaseSchema(DATABASE + i));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- // Create a DataRegionGroup for each StorageGroup
+ // Create a DataRegionGroup for each Database
Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new
HashMap<>();
seriesSlotMap.put(
new TSeriesPartitionSlot(1),
new TTimeSlotList()
.setTimePartitionSlots(Collections.singletonList(new
TTimePartitionSlot(100))));
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new
HashMap<>();
- sgSlotsMap.put(sg + i, seriesSlotMap);
+ sgSlotsMap.put(DATABASE + i, seriesSlotMap);
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(new
TDataPartitionReq(sgSlotsMap));
Assert.assertEquals(
@@ -268,7 +262,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
}
// Check leader distribution
- Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ Map<Integer, Integer> leaderCounter = new ConcurrentHashMap<>();
TShowRegionResp showRegionResp;
boolean isDistributionBalanced = false;
for (int retry = 0; retry < retryNum; retry++) {
@@ -279,18 +273,17 @@ public class IoTDBClusterRegionLeaderBalancingIT {
.forEach(
regionInfo -> {
if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
- leaderCounter
- .computeIfAbsent(regionInfo.getDataNodeId(), empty ->
new AtomicInteger(0))
- .getAndIncrement();
+ leaderCounter.merge(regionInfo.getDataNodeId(), 1,
Integer::sum);
}
});
// All DataNodes have Region-leader
- isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
+ isDistributionBalanced = leaderCounter.size() == TEST_DATA_NODE_NUM;
// Each DataNode has exactly 1 Region-leader
- for (AtomicInteger leaderCount : leaderCounter.values()) {
- if (leaderCount.get() != 1) {
+ for (Integer leaderCount : leaderCounter.values()) {
+ if (leaderCount != databaseNum / TEST_DATA_NODE_NUM) {
isDistributionBalanced = false;
+ break;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 678b7c868a5..b20f1e87d2f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -187,7 +187,7 @@ public class ConfigNodeConfig {
private long unknownDataNodeDetectInterval = heartbeatIntervalInMs;
/** The policy of cluster RegionGroups' leader distribution. */
- private String leaderDistributionPolicy =
ILeaderBalancer.MIN_COST_FLOW_POLICY;
+ private String leaderDistributionPolicy = ILeaderBalancer.CFD_POLICY;
/** Whether to enable auto leader balance for Ratis consensus protocol. */
private boolean enableAutoLeaderBalanceForRatisConsensus = true;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index ed595dca092..e59f8d22343 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -353,13 +353,12 @@ public class ConfigNodeDescriptor {
.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy())
.trim();
if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
- ||
ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(leaderDistributionPolicy)) {
+ || ILeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) {
conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
} else {
throw new IOException(
String.format(
- "Unknown leader_distribution_policy: %s, "
- + "please set to \"GREEDY\" or \"MIN_COST_FLOW\"",
+ "Unknown leader_distribution_policy: %s, " + "please set to
\"GREEDY\" or \"CFD\"",
leaderDistributionPolicy));
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 000e08d348e..1d8bf58259e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -145,7 +145,7 @@ public class ConfigNodeStartupCheck extends StartupChecks {
// The leader distribution policy is limited
if
(!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
- &&
!ILeaderBalancer.MIN_COST_FLOW_POLICY.equals(CONF.getLeaderDistributionPolicy()))
{
+ &&
!ILeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
throw new ConfigurationException(
"leader_distribution_policy",
CONF.getRoutePriorityPolicy(),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index d5b88260505..67b1c342b29 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -107,7 +107,7 @@ public class RouteBalancer {
case ILeaderBalancer.GREEDY_POLICY:
this.leaderBalancer = new GreedyLeaderBalancer();
break;
- case ILeaderBalancer.MIN_COST_FLOW_POLICY:
+ case ILeaderBalancer.CFD_POLICY:
default:
this.leaderBalancer = new MinCostFlowLeaderBalancer();
break;
@@ -158,6 +158,7 @@ public class RouteBalancer {
Map<TConsensusGroupId, Integer> currentLeaderMap =
getLoadManager().getRegionLeaderMap();
Map<TConsensusGroupId, Integer> optimalLeaderMap =
leaderBalancer.generateOptimalLeaderDistribution(
+ getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
getPartitionManager().getAllReplicaSetsMap(regionGroupType),
currentLeaderMap,
getNodeManager()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 0d3f813a689..4a28fb6ca31 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -16,21 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
/** Leader distribution balancer that uses greedy algorithm */
public class GreedyLeaderBalancer implements ILeaderBalancer {
@@ -47,6 +46,7 @@ public class GreedyLeaderBalancer implements ILeaderBalancer {
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
@@ -74,113 +74,26 @@ public class GreedyLeaderBalancer implements
ILeaderBalancer {
}
private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
- /* Count the number of leaders that each DataNode have */
- // Map<DataNodeId, leader count>
- Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+ Map<Integer, Integer> leaderCounter = new TreeMap<>();
regionReplicaSetMap.forEach(
- (regionGroupId, regionReplicaSet) ->
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation ->
- leaderCounter.putIfAbsent(
- dataNodeLocation.getDataNodeId(), new
AtomicInteger(0))));
- regionLeaderMap.forEach(
- (regionGroupId, leaderId) ->
leaderCounter.get(leaderId).getAndIncrement());
-
- /* Ensure all RegionGroups' leader are not inside disabled DataNodes */
- for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
- int leaderId = regionLeaderMap.get(regionGroupId);
- if (disabledDataNodeSet.contains(leaderId)) {
- int newLeaderId = -1;
- int newLeaderWeight = Integer.MAX_VALUE;
- for (TDataNodeLocation candidate :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int candidateId = candidate.getDataNodeId();
- int candidateWeight = leaderCounter.get(candidateId).get();
- // Select the available DataNode with the fewest leaders
- if (!disabledDataNodeSet.contains(candidateId) && candidateWeight <
newLeaderWeight) {
- newLeaderId = candidateId;
- newLeaderWeight = candidateWeight;
+ (regionGroupId, regionGroup) -> {
+ int minCount = Integer.MAX_VALUE,
+ leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1);
+ for (TDataNodeLocation dataNodeLocation :
regionGroup.getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ continue;
+ }
+ // Select the DataNode with the minimal leader count as the new
leader
+ int count = leaderCounter.getOrDefault(dataNodeId, 0);
+ if (count < minCount) {
+ minCount = count;
+ leaderId = dataNodeId;
+ }
}
- }
-
- if (newLeaderId != -1) {
- leaderCounter.get(leaderId).getAndDecrement();
- leaderCounter.get(newLeaderId).getAndIncrement();
- regionLeaderMap.replace(regionGroupId, newLeaderId);
- }
- }
- }
-
- /* Double keyword sort */
- List<WeightEntry> weightList = new ArrayList<>();
- for (TConsensusGroupId regionGroupId : regionReplicaSetMap.keySet()) {
- int leaderId = regionLeaderMap.get(regionGroupId);
- int leaderWeight =
leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
-
- int followerWeight = Integer.MAX_VALUE;
- for (TDataNodeLocation follower :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int followerId = follower.getDataNodeId();
- if (followerId != leaderId) {
- followerWeight = Math.min(followerWeight,
leaderCounter.get(followerId).get());
- }
- }
-
- weightList.add(new WeightEntry(regionGroupId, leaderWeight,
followerWeight));
- }
- weightList.sort(WeightEntry.COMPARATOR);
-
- /* Greedy distribution */
- for (WeightEntry weightEntry : weightList) {
- TConsensusGroupId regionGroupId = weightEntry.regionGroupId;
- int leaderId = regionLeaderMap.get(regionGroupId);
- int leaderWeight =
leaderCounter.get(regionLeaderMap.get(regionGroupId)).get();
-
- int newLeaderId = -1;
- int newLeaderWeight = Integer.MAX_VALUE;
- for (TDataNodeLocation candidate :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int candidateId = candidate.getDataNodeId();
- int candidateWeight = leaderCounter.get(candidateId).get();
- if (!disabledDataNodeSet.contains(candidateId)
- && candidateId != leaderId
- && candidateWeight < newLeaderWeight) {
- newLeaderId = candidateId;
- newLeaderWeight = candidateWeight;
- }
- }
-
- // Redistribution takes effect only when leaderWeight - newLeaderWeight
> 1.
- // i.e. Redistribution can reduce the range of the number of leaders
that each DataNode owns.
- if (leaderWeight - newLeaderWeight > 1) {
- leaderCounter.get(leaderId).getAndDecrement();
- leaderCounter.get(newLeaderId).getAndIncrement();
- regionLeaderMap.replace(regionGroupId, newLeaderId);
- }
- }
-
+ regionLeaderMap.put(regionGroupId, leaderId);
+ leaderCounter.merge(leaderId, 1, Integer::sum);
+ });
return new ConcurrentHashMap<>(regionLeaderMap);
}
-
- private static class WeightEntry {
-
- private final TConsensusGroupId regionGroupId;
- // The number of leaders owned by DataNode where the RegionGroup's leader
resides
- private final int firstKey;
- // The minimum number of leaders owned by DataNode where the
RegionGroup's followers reside
- private final int secondKey;
-
- private WeightEntry(TConsensusGroupId regionGroupId, int firstKey, int
secondKey) {
- this.regionGroupId = regionGroupId;
- this.firstKey = firstKey;
- this.secondKey = secondKey;
- }
-
- // Compare the first key by descending order and the second key by
ascending order.
- private static final Comparator<WeightEntry> COMPARATOR =
- (o1, o2) ->
- o1.firstKey == o2.firstKey ? o1.secondKey - o2.secondKey :
o2.firstKey - o1.firstKey;
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
index a69ccc9491d..65c45aaebab 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
@@ -16,22 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
public interface ILeaderBalancer {
String GREEDY_POLICY = "GREEDY";
- String MIN_COST_FLOW_POLICY = "MIN_COST_FLOW";
+ String CFD_POLICY = "CFD";
/**
* Generate an optimal leader distribution.
*
+ * @param databaseRegionGroupMap RegionGroup held by each Database
* @param regionReplicaSetMap All RegionGroups the cluster currently have
* @param regionLeaderMap The current leader of each RegionGroup
* @param disabledDataNodeSet The DataNodes that currently unable to
work(can't place
@@ -39,6 +42,7 @@ public interface ILeaderBalancer {
* @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
*/
Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 9b775d71cd9..48aa8cc5547 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -25,15 +26,14 @@ import org.apache.iotdb.commons.utils.TestOnly;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
/** Leader distribution balancer that uses minimum cost flow algorithm */
public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
@@ -41,8 +41,9 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private static final int INFINITY = Integer.MAX_VALUE;
/** Input parameters */
- private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+ private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
+ private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
private final Map<TConsensusGroupId, Integer> regionLeaderMap;
private final Set<Integer> disabledDataNodeSet;
@@ -55,10 +56,12 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private int maxNode = T_NODE + 1;
// Map<RegionGroupId, rNode>
private final Map<TConsensusGroupId, Integer> rNodeMap;
- // Map<DataNodeId, dNode>
- private final Map<Integer, Integer> dNodeMap;
- // Map<dNode, DataNodeId>
- private final Map<Integer, Integer> dNodeReflect;
+ // Map<Database, Map<DataNodeId, sDNode>>
+ private final Map<String, Map<Integer, Integer>> sDNodeMap;
+ // Map<Database, Map<sDNode, DataNodeId>>
+ private final Map<String, Map<Integer, Integer>> sDNodeReflect;
+ // Map<DataNodeId, tDNode>
+ private final Map<Integer, Integer> tDNodeMap;
/** Graph edges */
// Maximum index of graph edges
@@ -75,22 +78,25 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private int minimumCost = 0;
public MinCostFlowLeaderBalancer() {
- this.regionReplicaSetMap = new HashMap<>();
- this.regionLeaderMap = new HashMap<>();
- this.disabledDataNodeSet = new HashSet<>();
- this.rNodeMap = new HashMap<>();
- this.dNodeMap = new HashMap<>();
- this.dNodeReflect = new HashMap<>();
+ this.databaseRegionGroupMap = new TreeMap<>();
+ this.regionReplicaSetMap = new TreeMap<>();
+ this.regionLeaderMap = new TreeMap<>();
+ this.disabledDataNodeSet = new TreeSet<>();
+ this.rNodeMap = new TreeMap<>();
+ this.sDNodeMap = new TreeMap<>();
+ this.sDNodeReflect = new TreeMap<>();
+ this.tDNodeMap = new TreeMap<>();
this.minCostFlowEdges = new ArrayList<>();
}
@Override
public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
- initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
Map<TConsensusGroupId, Integer> result;
constructMCFGraph();
@@ -102,21 +108,26 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
}
private void initialize(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
+ this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
this.regionReplicaSetMap.putAll(regionReplicaSetMap);
this.regionLeaderMap.putAll(regionLeaderMap);
this.disabledDataNodeSet.addAll(disabledDataNodeSet);
}
private void clear() {
+ this.databaseRegionGroupMap.clear();
this.regionReplicaSetMap.clear();
this.regionLeaderMap.clear();
this.disabledDataNodeSet.clear();
+
this.rNodeMap.clear();
- this.dNodeMap.clear();
- this.dNodeReflect.clear();
+ this.sDNodeMap.clear();
+ this.sDNodeReflect.clear();
+ this.tDNodeMap.clear();
this.minCostFlowEdges.clear();
this.nodeHeadEdge = null;
@@ -133,13 +144,30 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
this.minimumCost = 0;
/* Indicate nodes in mcf */
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
- rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) {
- dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode);
- dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId());
- maxNode += 1;
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ sDNodeMap.put(database, new TreeMap<>());
+ sDNodeReflect.put(database, new TreeMap<>());
+ List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue();
+ for (TConsensusGroupId regionGroupId : regionGroupIds) {
+ rNodeMap.put(regionGroupId, maxNode++);
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
+ sDNodeMap.get(database).put(dataNodeId, maxNode);
+ sDNodeReflect.get(database).put(maxNode, dataNodeId);
+ maxNode += 1;
+ }
+ if (!tDNodeMap.containsKey(dataNodeId)) {
+ tDNodeMap.put(dataNodeId, maxNode);
+ maxNode += 1;
+ }
}
}
}
@@ -153,57 +181,74 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
/* Construct edges: sNode -> rNodes */
for (int rNode : rNodeMap.values()) {
- // Cost: 0
+ // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
addAdjacentEdges(S_NODE, rNode, 1, 0);
}
- /* Construct edges: rNodes -> dNodes */
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
- int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
- // Cost: 1 if the dNode is corresponded to the current leader of the
rNode,
- // 0 otherwise.
- // Therefore, the RegionGroup will keep the leader as constant as
possible.
- int cost =
- regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
- == dataNodeLocation.getDataNodeId()
- ? 0
- : 1;
- addAdjacentEdges(rNode, dNode, 1, cost);
+ /* Construct edges: rNodes -> sdNodes */
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+ int rNode = rNodeMap.get(regionGroupId);
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
+ // Therefore, the RegionGroup will keep the leader as constant as
possible.
+ int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
+ addAdjacentEdges(rNode, sDNode, 1, cost);
+ }
}
}
- /* Construct edges: dNodes -> tNode */
- // Count the possible maximum number of leader in each DataNode
- Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
- regionReplicaSetMap
- .values()
- .forEach(
- regionReplicaSet ->
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation ->
- maxLeaderCounter
- .computeIfAbsent(
- dataNodeLocation.getDataNodeId(), empty ->
new AtomicInteger(0))
- .getAndIncrement()));
-
- for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
- int dataNodeId = dNodeEntry.getKey();
- int dNode = dNodeEntry.getValue();
-
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
+ /* Construct edges: sDNodes -> tDNodes */
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ // Map<DataNodeId, leader number>
+ Map<Integer, Integer> leaderCounter = new TreeMap<>();
+ for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+ // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+ // Thus, the leader distribution will be as balance as possible
within each Database
+ // based on the Jensen's-Inequality.
+ addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
+ }
}
+ }
- int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
- for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+ /* Construct edges: tDNodes -> tNode */
+ // Map<DataNodeId, possible maximum leader>
+ // Count the possible maximum number of leader in each DataNode
+ Map<Integer, Integer> maxLeaderCounter = new TreeMap<>();
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum);
// Cost: x^2 for the x-th edge at the current dNode.
- // Thus, the leader distribution will be as balance as possible.
- addAdjacentEdges(dNode, T_NODE, 1, extraEdge * extraEdge);
+ // Thus, the leader distribution will be as balance as possible within
the cluster
+ // Based on the Jensen's-Inequality.
+ addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
}
}
}
@@ -310,22 +355,24 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private Map<TConsensusGroupId, Integer> collectLeaderDistribution() {
Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
- rNodeMap.forEach(
- (regionGroupId, rNode) -> {
- boolean matchLeader = false;
- for (int currentEdge = nodeHeadEdge[rNode];
- currentEdge >= 0;
- currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
- MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
- if (edge.destNode != S_NODE && edge.capacity == 0) {
- matchLeader = true;
- result.put(regionGroupId, dNodeReflect.get(edge.destNode));
- }
- }
- if (!matchLeader) {
- result.put(regionGroupId,
regionLeaderMap.getOrDefault(regionGroupId, -1));
- }
- });
+ databaseRegionGroupMap.forEach(
+ (database, regionGroupIds) ->
+ regionGroupIds.forEach(
+ regionGroupId -> {
+ boolean matchLeader = false;
+ for (int currentEdge =
nodeHeadEdge[rNodeMap.get(regionGroupId)];
+ currentEdge >= 0;
+ currentEdge =
minCostFlowEdges.get(currentEdge).nextEdge) {
+ MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
+ if (edge.destNode != S_NODE && edge.capacity == 0) {
+ matchLeader = true;
+ result.put(regionGroupId,
sDNodeReflect.get(database).get(edge.destNode));
+ }
+ }
+ if (!matchLeader) {
+ result.put(regionGroupId,
regionLeaderMap.getOrDefault(regionGroupId, -1));
+ }
+ }));
return result;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 7480d194064..0fd094bd869 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -688,8 +688,11 @@ public class PartitionInfo implements SnapshotProcessor {
databasePartitionTables
.values()
.forEach(
- databasePartitionTable ->
- result.addAll(databasePartitionTable.getAllReplicaSets(type)));
+ databasePartitionTable -> {
+ if (databasePartitionTable.isNotPreDeleted()) {
+ result.addAll(databasePartitionTable.getAllReplicaSets(type));
+ }
+ });
return result;
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
similarity index 78%
rename from
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
rename to
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
index f676956780e..716fa55a80f 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -28,20 +29,23 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-public class MinCostFlowLeaderBalancerTest {
+public class CFDLeaderBalancerTest {
private static final MinCostFlowLeaderBalancer BALANCER = new
MinCostFlowLeaderBalancer();
+ private static final String DATABASE = "root.database";
+
/** This test shows a simple case that greedy algorithm might fail */
@Test
public void optimalLeaderDistributionTest() {
@@ -50,12 +54,14 @@ public class MinCostFlowLeaderBalancerTest {
for (int i = 0; i < 3; i++) {
regionGroupIds.add(new TConsensusGroupId(TConsensusGroupType.DataRegion,
i));
}
-
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
for (int i = 0; i < 4; i++) {
dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(i));
}
-
+ // DataNode-0: [0, 1, 2], DataNode-1: [0, 1]
+ // DataNode-2: [0, 2] , DataNode-3: [1, 2]
+ // The result will be unbalanced if select DataNode-2 as leader for
RegionGroup-0
+ // and select DataNode-3 as leader for RegionGroup-1
List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
regionReplicaSets.add(
new TRegionReplicaSet(
@@ -74,11 +80,13 @@ public class MinCostFlowLeaderBalancerTest {
dataNodeLocations.get(0), dataNodeLocations.get(2),
dataNodeLocations.get(3))));
// Prepare input parameters
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ databaseRegionGroupMap.put(DATABASE, regionGroupIds);
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
regionReplicaSetMap.put(regionReplicaSet.getRegionId(),
regionReplicaSet));
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionReplicaSets.forEach(
regionReplicaSet ->
regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
Set<Integer> disabledDataNodeSet = new HashSet<>();
@@ -87,15 +95,16 @@ public class MinCostFlowLeaderBalancerTest {
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
// All RegionGroup got a leader
Assert.assertEquals(3, leaderDistribution.size());
- // Each DataNode occurs exactly once
+ // Each DataNode has exactly one leader
Assert.assertEquals(3, new HashSet<>(leaderDistribution.values()).size());
// MaxFlow is 3
Assert.assertEquals(3, BALANCER.getMaximumFlow());
- // MinimumCost is 3(switch leader cost) + 3(load cost, 1 for each DataNode)
- Assert.assertEquals(3 + 3, BALANCER.getMinimumCost());
+ // MinimumCost is 3(switch leader cost) + 3(load cost, rNode -> sDNode)
+ // + 3(load cost, sDNode -> tDNode)
+ Assert.assertEquals(3 + 3 + 3, BALANCER.getMinimumCost());
}
/** The leader will remain the same if all DataNodes are disabled */
@@ -110,9 +119,11 @@ public class MinCostFlowLeaderBalancerTest {
new TDataNodeLocation().setDataNodeId(2)));
// Prepare input parameters
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ databaseRegionGroupMap.put(DATABASE,
Collections.singletonList(regionReplicaSet.getRegionId()));
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
Set<Integer> disabledDataNodeSet = new HashSet<>();
disabledDataNodeSet.add(0);
@@ -122,7 +133,7 @@ public class MinCostFlowLeaderBalancerTest {
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
Assert.assertEquals(1, leaderDistribution.size());
Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
// Leader remains the same
@@ -148,13 +159,15 @@ public class MinCostFlowLeaderBalancerTest {
// The loadCost for each DataNode are the same
int x = regionGroupNum / dataNodeNum;
- // i.e. formula of 1^2 + 2^2 + 3^2 + ...
- int loadCost = x * (x + 1) * (2 * x + 1) / 6;
+ // i.e. formula of (1^2 + 2^2 + 3^2 + ...) * 2
+ int loadCost = x * (x + 1) * (2 * x + 1) / 3;
int dataNodeId = 0;
Random random = new Random();
- Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
HashMap<>();
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new
TreeMap<>();
+ databaseRegionGroupMap.put(DATABASE, new ArrayList<>());
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new
TreeMap<>();
+ Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
for (int i = 0; i < regionGroupNum; i++) {
TConsensusGroupId regionGroupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
int leaderId = (dataNodeId + random.nextInt(replicationFactor)) %
dataNodeNum;
@@ -166,6 +179,7 @@ public class MinCostFlowLeaderBalancerTest {
dataNodeId = (dataNodeId + 1) % dataNodeNum;
}
+ databaseRegionGroupMap.get(DATABASE).add(regionGroupId);
regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
regionLeaderMap.put(regionGroupId, leaderId);
}
@@ -173,24 +187,18 @@ public class MinCostFlowLeaderBalancerTest {
// Do balancing
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- regionReplicaSetMap, regionLeaderMap, new HashSet<>());
+ databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, new
HashSet<>());
// All RegionGroup got a leader
Assert.assertEquals(regionGroupNum, leaderDistribution.size());
- Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
- leaderDistribution
- .values()
- .forEach(
- leaderId ->
- leaderCounter
- .computeIfAbsent(leaderId, empty -> new AtomicInteger(0))
- .getAndIncrement());
+ Map<Integer, Integer> leaderCounter = new ConcurrentHashMap<>();
+ leaderDistribution.values().forEach(leaderId ->
leaderCounter.merge(leaderId, 1, Integer::sum));
// Every DataNode has leader
Assert.assertEquals(dataNodeNum, leaderCounter.size());
// Every DataNode has exactly regionGroupNum / dataNodeNum leaders
- leaderCounter
- .values()
- .forEach(leaderNum -> Assert.assertEquals(regionGroupNum /
dataNodeNum, leaderNum.get()));
+ for (int i = 0; i < dataNodeNum; i++) {
+ Assert.assertEquals(regionGroupNum / dataNodeNum,
leaderCounter.get(i).intValue());
+ }
// MaxFlow is regionGroupNum
Assert.assertEquals(regionGroupNum, BALANCER.getMaximumFlow());
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
index b676a62ecfb..a443fe74c02 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -32,6 +33,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -76,7 +78,7 @@ public class GreedyLeaderBalancerTest {
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ new TreeMap<>(), regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
leaderDistribution.forEach(
(regionGroupId, leaderId) ->
@@ -129,7 +131,7 @@ public class GreedyLeaderBalancerTest {
Map<TConsensusGroupId, Integer> leaderDistribution =
BALANCER.generateOptimalLeaderDistribution(
- regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
+ new TreeMap<>(), regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
leaderDistribution.forEach(
(regionGroupId, leaderId) ->
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
index 9c7bda4287f..2a1c7093906 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -38,6 +39,7 @@ import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -264,7 +266,7 @@ public class LeaderBalancerComparisonTest {
for (int rounds = 0; rounds < 1000; rounds++) {
Map<TConsensusGroupId, Integer> currentDistribution =
leaderBalancer.generateOptimalLeaderDistribution(
- regionReplicaSetMap, lastDistribution, disabledDataNodeSet);
+ new TreeMap<>(), regionReplicaSetMap, lastDistribution,
disabledDataNodeSet);
if (currentDistribution.equals(lastDistribution)) {
// The leader distribution is stable
result.rounds = rounds;
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 7dadb6a4d2f..cd1920cc643 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -126,15 +126,6 @@ data_replication_factor=1
# Datatype: Double
# data_region_per_data_node=5.0
-
-# The policy of cluster RegionGroups' leader distribution.
-# E.g. we should balance cluster RegionGroups' leader distribution when some
DataNodes are shutdown or re-connected.
-# These policies are currently supported:
-# 1. GREEDY(Distribute leader through a simple greedy algorithm, might cause
unbalance)
-# 2. MIN_COST_FLOW(Default, distribute leader through min cost flow algorithm)
-# Datatype: String
-# leader_distribution_policy=MIN_COST_FLOW
-
# Whether to enable auto leader balance for Ratis consensus protocol.
# The ConfigNode-leader will balance the leader of Ratis-RegionGroups by
leader_distribution_policy if set true.
# Notice: Default is false because the Ratis is unstable for this function.