This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch add_hash_leader_distribution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d7f393beed0b967845405f4d8ad60e09e42676b5 Author: Tian Jiang <[email protected]> AuthorDate: Mon Oct 27 09:58:08 2025 +0800 Add HashLedaerBalancer --- .gitignore | 6 +++ .../confignode/conf/ConfigNodeDescriptor.java | 3 +- .../confignode/conf/ConfigNodeStartupCheck.java | 3 +- .../manager/load/balancer/RouteBalancer.java | 4 ++ .../router/leader/AbstractLeaderBalancer.java | 1 + .../balancer/router/leader/HashLeaderBalancer.java | 60 ++++++++++++++++++++++ 6 files changed, 75 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 111f5922f1a..71e0fc85edb 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,9 @@ iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/ # Develocity .mvn/.gradle-enterprise/ .mvn/.develocity/ +.run/CN1.run.xml +.run/CN-ide.run.xml +.run/DN1.run.xml +.run/DN2.run.xml +.run/DN3.run.xml +.run/DN-ide.run.xml diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 1209e669055..614bae210df 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -358,7 +358,8 @@ public class ConfigNodeDescriptor { .getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()) .trim(); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) - || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) { + || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy) + || AbstractLeaderBalancer.HASH_POLICY.equals(leaderDistributionPolicy)) { conf.setLeaderDistributionPolicy(leaderDistributionPolicy); } else { throw new IOException( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java index 3400b5316fc..ffc429e9c89 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java @@ -159,7 +159,8 @@ public class ConfigNodeStartupCheck extends StartupChecks { // The leader distribution policy is limited if (!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy()) - && !AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) { + && !AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy()) + && !AbstractLeaderBalancer.HASH_POLICY.equals(CONF.getLeaderDistributionPolicy())) { throw new ConfigurationException( "leader_distribution_policy", CONF.getRoutePriorityPolicy(), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index e2f1c6758cc..706fb15d559 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer; +import org.apache.iotdb.confignode.manager.load.balancer.router.leader.HashLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; @@ -134,6 +135,9 @@ public class RouteBalancer implements IClusterStatusSubscriber { case AbstractLeaderBalancer.GREEDY_POLICY: this.leaderBalancer = new GreedyLeaderBalancer(); break; + case AbstractLeaderBalancer.HASH_POLICY: + this.leaderBalancer = new HashLeaderBalancer(); + break; case AbstractLeaderBalancer.CFD_POLICY: default: this.leaderBalancer = new MinCostFlowLeaderBalancer(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java index e545d8b4ff1..e8295f990c3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java @@ -40,6 +40,7 @@ public abstract class AbstractLeaderBalancer { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLeaderBalancer.class); public static final String GREEDY_POLICY = "GREEDY"; public static final String CFD_POLICY = "CFD"; + public static final String HASH_POLICY = "HASH"; // Set<RegionGroupId> protected final Set<TConsensusGroupId> regionGroupIntersection; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java new file mode 100644 index 00000000000..82b39d85ea9 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.router.leader; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; + +public class HashLeaderBalancer extends AbstractLeaderBalancer { + @Override + public Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution( + Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, + Map<TConsensusGroupId, Set<Integer>> regionLocationMap, + Map<TConsensusGroupId, Integer> regionLeaderMap, + Map<Integer, NodeStatistics> dataNodeStatisticsMap, + Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { + Map<TConsensusGroupId, Integer> result = new HashMap<>(); + regionLocationMap.forEach( + (gid, nodeSet) -> { + List<Integer> nodeList = new ArrayList<>(nodeSet); + nodeList.sort(null); + int startNodeIndex = Math.abs(gid.hashCode()) % nodeList.size(); + int finalNodeId = nodeList.get(startNodeIndex); + for (int i = 0; i < nodeList.size(); i++) { + int currentNodeIndex = (startNodeIndex + i) % nodeList.size(); + int currentNodeId = nodeList.get(currentNodeIndex); + NodeStatistics nodeStatistics = dataNodeStatisticsMap.get(currentNodeId); + if (nodeStatistics != null && nodeStatistics.getStatus() == NodeStatus.Running) { + finalNodeId = currentNodeId; + break; + } + } + result.put(gid, finalNodeId); + }); + return result; + } +}
