This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 71611a25430 [To dev/1.3] Add HashLedaerBalancer (#16654)
71611a25430 is described below
commit 71611a25430b7ffc34d982ce911147d279ccf6af
Author: Jiang Tian <[email protected]>
AuthorDate: Mon Oct 27 12:16:52 2025 +0800
[To dev/1.3] Add HashLedaerBalancer (#16654)
* Add HashLedaerBalancer
* spotless
---
.gitignore | 6 +++
.../confignode/conf/ConfigNodeDescriptor.java | 3 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 3 +-
.../manager/load/balancer/RouteBalancer.java | 4 ++
.../router/leader/AbstractLeaderBalancer.java | 1 +
.../balancer/router/leader/HashLeaderBalancer.java | 61 ++++++++++++++++++++++
6 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/.gitignore b/.gitignore
index 111f5922f1a..71e0fc85edb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,3 +123,9 @@
iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/
# Develocity
.mvn/.gradle-enterprise/
.mvn/.develocity/
+.run/CN1.run.xml
+.run/CN-ide.run.xml
+.run/DN1.run.xml
+.run/DN2.run.xml
+.run/DN3.run.xml
+.run/DN-ide.run.xml
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 1209e669055..614bae210df 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -358,7 +358,8 @@ public class ConfigNodeDescriptor {
.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy())
.trim();
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
- || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy))
{
+ || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)
+ ||
AbstractLeaderBalancer.HASH_POLICY.equals(leaderDistributionPolicy)) {
conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
} else {
throw new IOException(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 3400b5316fc..ffc429e9c89 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -159,7 +159,8 @@ public class ConfigNodeStartupCheck extends StartupChecks {
// The leader distribution policy is limited
if
(!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
- &&
!AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
+ &&
!AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())
+ &&
!AbstractLeaderBalancer.HASH_POLICY.equals(CONF.getLeaderDistributionPolicy()))
{
throw new ConfigurationException(
"leader_distribution_policy",
CONF.getRoutePriorityPolicy(),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index e2f1c6758cc..706fb15d559 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
+import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.HashLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
@@ -134,6 +135,9 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
case AbstractLeaderBalancer.GREEDY_POLICY:
this.leaderBalancer = new GreedyLeaderBalancer();
break;
+ case AbstractLeaderBalancer.HASH_POLICY:
+ this.leaderBalancer = new HashLeaderBalancer();
+ break;
case AbstractLeaderBalancer.CFD_POLICY:
default:
this.leaderBalancer = new MinCostFlowLeaderBalancer();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
index e545d8b4ff1..e8295f990c3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -40,6 +40,7 @@ public abstract class AbstractLeaderBalancer {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractLeaderBalancer.class);
public static final String GREEDY_POLICY = "GREEDY";
public static final String CFD_POLICY = "CFD";
+ public static final String HASH_POLICY = "HASH";
// Set<RegionGroupId>
protected final Set<TConsensusGroupId> regionGroupIntersection;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
new file mode 100644
index 00000000000..aff023e82da
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class HashLeaderBalancer extends AbstractLeaderBalancer {
+ @Override
+ public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
+ Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
+ Map<TConsensusGroupId, Set<Integer>> regionLocationMap,
+ Map<TConsensusGroupId, Integer> regionLeaderMap,
+ Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+ Map<TConsensusGroupId, Map<Integer, RegionStatistics>>
regionStatisticsMap) {
+ Map<TConsensusGroupId, Integer> result = new HashMap<>();
+ regionLocationMap.forEach(
+ (gid, nodeSet) -> {
+ List<Integer> nodeList = new ArrayList<>(nodeSet);
+ nodeList.sort(null);
+ int startNodeIndex = Math.abs(gid.hashCode()) % nodeList.size();
+ int finalNodeId = nodeList.get(startNodeIndex);
+ for (int i = 0; i < nodeList.size(); i++) {
+ int currentNodeIndex = (startNodeIndex + i) % nodeList.size();
+ int currentNodeId = nodeList.get(currentNodeIndex);
+ NodeStatistics nodeStatistics =
dataNodeStatisticsMap.get(currentNodeId);
+ if (nodeStatistics != null && nodeStatistics.getStatus() ==
NodeStatus.Running) {
+ finalNodeId = currentNodeId;
+ break;
+ }
+ }
+ result.put(gid, finalNodeId);
+ });
+ return result;
+ }
+}