Improve for MultiRoundCRUSH

Recalculate weight of MultiRoundCRUSH algorithm that makes the distribution 
more balance.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f4642429
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f4642429
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f4642429

Branch: refs/heads/master
Commit: f46424299c448bdacad3f7911f0c9eafdbbef2c4
Parents: 8bbe057
Author: Junkai Xue <j...@linkedin.com>
Authored: Thu Sep 14 15:43:23 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:08:18 2017 -0800

----------------------------------------------------------------------
 .../MultiRoundCrushRebalanceStrategy.java       | 117 ++++++++++---------
 1 file changed, 61 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f4642429/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
index 4e1f935..7506523 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
@@ -19,17 +19,6 @@ package org.apache.helix.controller.rebalancer.strategy;
  * under the License.
  */
 
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
-import org.apache.helix.util.JenkinsHash;
-import org.apache.helix.controller.rebalancer.topology.Node;
-import org.apache.helix.controller.rebalancer.topology.Topology;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.InstanceConfig;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,6 +28,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.util.JenkinsHash;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
 /**
  * Multi-round CRUSH partition mapping strategy.
  * This gives more even partition distribution in case of small number of 
partitions,
@@ -51,7 +52,7 @@ public class MultiRoundCrushRebalanceStrategy implements 
RebalanceStrategy {
   private int _replicas;
   private LinkedHashMap<String, Integer> _stateCountMap;
 
-  private final int MAX_ITERNATION = 5;
+  private final int MAX_ITERNATION = 3;
 
   @Override
   public void init(String resourceName, final List<String> partitions,
@@ -190,10 +191,10 @@ public class MultiRoundCrushRebalanceStrategy implements 
RebalanceStrategy {
     int totalPartition = partitions.size();
 
     // node to all its assigned partitions.
-    Map<Node, List<String>> nodePartitionsMap = new HashMap<Node, 
List<String>>();
+    Map<Node, List<String>> nodePartitionsMap = new HashMap<>();
 
-    List<String> partitionsToAssign = new ArrayList<String>(partitions);
-    Map<Node, List<String>> toRemovedMap = new HashMap<Node, List<String>>();
+    List<String> partitionsToAssign = new ArrayList<>(partitions);
+    Map<Node, List<String>> toRemovedMap = new HashMap<>();
 
     int iteration = 0;
     Node root = zone;
@@ -214,51 +215,13 @@ public class MultiRoundCrushRebalanceStrategy implements 
RebalanceStrategy {
           }
           nodePartitionsMap.get(n).add(p);
         }
+        root = recalculateWeight(zone, totalWeight, totalPartition, 
nodePartitionsMap, partitions,
+            toRemovedMap);
       }
-
-      Map<String, Integer> newNodeWeight = new HashMap<String, Integer>();
-      Set<String> completedNodes = new HashSet<String>();
-      for (Node node : Topology.getAllLeafNodes(zone)) {
-        if (node.isFailed()) {
-          completedNodes.add(node.getName());
-          continue;
-        }
-        long weight = node.getWeight();
-        double ratio = ((double) weight) / (double) totalWeight;
-        int target = (int) Math.floor(ratio * totalPartition);
-
-        List<String> assignedPatitions = nodePartitionsMap.get(node);
-        int numPartitions = 0;
-        if (assignedPatitions != null) {
-          numPartitions = assignedPatitions.size();
-        }
-        if (numPartitions > target + 1) {
-          int remove = numPartitions - target - 1;
-          Collections.sort(partitions);
-          List<String> toRemoved = new 
ArrayList<String>(assignedPatitions.subList(0, remove));
-          toRemovedMap.put(node, toRemoved);
-        }
-
-        int missing = target - numPartitions;
-        if (missing > 0) {
-          newNodeWeight.put(node.getName(), missing * 10);
-        } else {
-          completedNodes.add(node.getName());
-        }
-      }
-
-      if (newNodeWeight.isEmpty()) {
-        // already converged
-        break;
-      } else {
-        // iterate more
-        root = _clusterTopo.clone(zone, newNodeWeight, completedNodes);
-      }
-
       partitionsToAssign.clear();
     }
 
-    Map<String, Node> partitionMap = new HashMap<String, Node>();
+    Map<String, Node> partitionMap = new HashMap<>();
     for (Map.Entry<Node, List<String>> e : nodePartitionsMap.entrySet()) {
       Node n = e.getKey();
       List<String> assigned = e.getValue();
@@ -270,6 +233,48 @@ public class MultiRoundCrushRebalanceStrategy implements 
RebalanceStrategy {
     return partitionMap;
   }
 
+  private Node recalculateWeight(Node zone, long totalWeight, int 
totalPartition,
+      Map<Node, List<String>> nodePartitionsMap, List<String> partitions,
+      Map<Node, List<String>> toRemovedMap) {
+    Map<String, Integer> newNodeWeight = new HashMap<>();
+    Set<String> completedNodes = new HashSet<>();
+    for (Node node : Topology.getAllLeafNodes(zone)) {
+      if (node.isFailed()) {
+        completedNodes.add(node.getName());
+        continue;
+      }
+      long weight = node.getWeight();
+      double ratio = ((double) weight) / (double) totalWeight;
+      int target = (int) Math.floor(ratio * totalPartition);
+
+      List<String> assignedPatitions = nodePartitionsMap.get(node);
+      int numPartitions = 0;
+      if (assignedPatitions != null) {
+        numPartitions = assignedPatitions.size();
+      }
+      if (numPartitions > target + 1) {
+        int remove = numPartitions - target - 1;
+        Collections.sort(partitions);
+        List<String> toRemoved = new ArrayList<>(assignedPatitions.subList(0, 
remove));
+        toRemovedMap.put(node, toRemoved);
+      }
+
+      int missing = target - numPartitions;
+      if (missing > 0) {
+        newNodeWeight.put(node.getName(), missing * 10);
+      } else {
+        completedNodes.add(node.getName());
+      }
+    }
+
+    if (!newNodeWeight.isEmpty()) {
+      // iterate more
+      return _clusterTopo.clone(zone, newNodeWeight, completedNodes);
+    }
+
+    // If the newNodeWeight map is empty, it will use old root tree to 
calculate it.
+    return zone;
+  }
   /**
    * Number of retries for finding an appropriate instance for a replica.
    */

Reply via email to