Improve for MultiRoundCRUSH Recalculate weight of MultiRoundCRUSH algorithm that makes the distribution more balance.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f4642429 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f4642429 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f4642429 Branch: refs/heads/master Commit: f46424299c448bdacad3f7911f0c9eafdbbef2c4 Parents: 8bbe057 Author: Junkai Xue <j...@linkedin.com> Authored: Thu Sep 14 15:43:23 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:08:18 2017 -0800 ---------------------------------------------------------------------- .../MultiRoundCrushRebalanceStrategy.java | 117 ++++++++++--------- 1 file changed, 61 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f4642429/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java index 4e1f935..7506523 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java @@ -19,17 +19,6 @@ package org.apache.helix.controller.rebalancer.strategy; * under the License. */ -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import org.apache.helix.HelixException; -import org.apache.helix.ZNRecord; -import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; -import org.apache.helix.util.JenkinsHash; -import org.apache.helix.controller.rebalancer.topology.Node; -import org.apache.helix.controller.rebalancer.topology.Topology; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.model.InstanceConfig; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -39,6 +28,18 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.util.JenkinsHash; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + /** * Multi-round CRUSH partition mapping strategy. * This gives more even partition distribution in case of small number of partitions, @@ -51,7 +52,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { private int _replicas; private LinkedHashMap<String, Integer> _stateCountMap; - private final int MAX_ITERNATION = 5; + private final int MAX_ITERNATION = 3; @Override public void init(String resourceName, final List<String> partitions, @@ -190,10 +191,10 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { int totalPartition = partitions.size(); // node to all its assigned partitions. - Map<Node, List<String>> nodePartitionsMap = new HashMap<Node, List<String>>(); + Map<Node, List<String>> nodePartitionsMap = new HashMap<>(); - List<String> partitionsToAssign = new ArrayList<String>(partitions); - Map<Node, List<String>> toRemovedMap = new HashMap<Node, List<String>>(); + List<String> partitionsToAssign = new ArrayList<>(partitions); + Map<Node, List<String>> toRemovedMap = new HashMap<>(); int iteration = 0; Node root = zone; @@ -214,51 +215,13 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { } nodePartitionsMap.get(n).add(p); } + root = recalculateWeight(zone, totalWeight, totalPartition, nodePartitionsMap, partitions, + toRemovedMap); } - - Map<String, Integer> newNodeWeight = new HashMap<String, Integer>(); - Set<String> completedNodes = new HashSet<String>(); - for (Node node : Topology.getAllLeafNodes(zone)) { - if (node.isFailed()) { - completedNodes.add(node.getName()); - continue; - } - long weight = node.getWeight(); - double ratio = ((double) weight) / (double) totalWeight; - int target = (int) Math.floor(ratio * totalPartition); - - List<String> assignedPatitions = nodePartitionsMap.get(node); - int numPartitions = 0; - if (assignedPatitions != null) { - numPartitions = assignedPatitions.size(); - } - if (numPartitions > target + 1) { - int remove = numPartitions - target - 1; - Collections.sort(partitions); - List<String> toRemoved = new ArrayList<String>(assignedPatitions.subList(0, remove)); - toRemovedMap.put(node, toRemoved); - } - - int missing = target - numPartitions; - if (missing > 0) { - newNodeWeight.put(node.getName(), missing * 10); - } else { - completedNodes.add(node.getName()); - } - } - - if (newNodeWeight.isEmpty()) { - // already converged - break; - } else { - // iterate more - root = _clusterTopo.clone(zone, newNodeWeight, completedNodes); - } - partitionsToAssign.clear(); } - Map<String, Node> partitionMap = new HashMap<String, Node>(); + Map<String, Node> partitionMap = new HashMap<>(); for (Map.Entry<Node, List<String>> e : nodePartitionsMap.entrySet()) { Node n = e.getKey(); List<String> assigned = e.getValue(); @@ -270,6 +233,48 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { return partitionMap; } + private Node recalculateWeight(Node zone, long totalWeight, int totalPartition, + Map<Node, List<String>> nodePartitionsMap, List<String> partitions, + Map<Node, List<String>> toRemovedMap) { + Map<String, Integer> newNodeWeight = new HashMap<>(); + Set<String> completedNodes = new HashSet<>(); + for (Node node : Topology.getAllLeafNodes(zone)) { + if (node.isFailed()) { + completedNodes.add(node.getName()); + continue; + } + long weight = node.getWeight(); + double ratio = ((double) weight) / (double) totalWeight; + int target = (int) Math.floor(ratio * totalPartition); + + List<String> assignedPatitions = nodePartitionsMap.get(node); + int numPartitions = 0; + if (assignedPatitions != null) { + numPartitions = assignedPatitions.size(); + } + if (numPartitions > target + 1) { + int remove = numPartitions - target - 1; + Collections.sort(partitions); + List<String> toRemoved = new ArrayList<>(assignedPatitions.subList(0, remove)); + toRemovedMap.put(node, toRemoved); + } + + int missing = target - numPartitions; + if (missing > 0) { + newNodeWeight.put(node.getName(), missing * 10); + } else { + completedNodes.add(node.getName()); + } + } + + if (!newNodeWeight.isEmpty()) { + // iterate more + return _clusterTopo.clone(zone, newNodeWeight, completedNodes); + } + + // If the newNodeWeight map is empty, it will use old root tree to calculate it. + return zone; + } /** * Number of retries for finding an appropriate instance for a replica. */