Repository: helix Updated Branches: refs/heads/helix-0.6.x 5278e175d -> 8d409fc7e
[HELIX-631] Fix AutoRebalanceStrategy replica not assigned In our current AutoRebalanceStrategy, Helix uses greedy algorithm to assign replicas. With the constraint that two replicas from same partition should not assigned to same node and nodes' capacity calculated by evenly distributed. Thus there may some replicas are not assigned. With this fix, Helix will try to force assign the orphaned replicas to the node with minimum overload. This may cause imbalanced assignment. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8d409fc7 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8d409fc7 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8d409fc7 Branch: refs/heads/helix-0.6.x Commit: 8d409fc7e466b583fe09774ef2d2c1ad850d6c56 Parents: 5278e17 Author: Junkai Xue <[email protected]> Authored: Tue May 9 11:38:36 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Wed May 10 11:22:01 2017 -0700 ---------------------------------------------------------------------- .../strategy/AutoRebalanceStrategy.java | 25 +++++++ ...utoRebalanceStrategyImbalanceAssignment.java | 77 ++++++++++++++++++++ 2 files changed, 102 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8d409fc7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java index 8b6a234..6d17d77 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java @@ -147,6 +147,10 @@ public class AutoRebalanceStrategy implements RebalanceStrategy { moveExcessReplicas(); + if (_orphaned.size() > 0) { + forceToAssignOrphans(); + } + prepareResult(znRecord); return znRecord; } @@ -365,6 +369,27 @@ public class AutoRebalanceStrategy implements RebalanceStrategy { } } + private void forceToAssignOrphans() { + for (Replica replica : _orphaned) { + int minOverloadedCapacity = Integer.MAX_VALUE; + Node nodeToAssign = null; + for (int i = 0; i < _liveNodesList.size(); i++) { + Node receiver = _liveNodesList.get(i); + if ((nodeToAssign == null || receiver.capacity < minOverloadedCapacity) + && receiver.currentlyAssigned < _maximumPerNode && receiver + .canAddIfCapacity(replica)) { + nodeToAssign = receiver; + } + } + + if (nodeToAssign != null) { + nodeToAssign.currentlyAssigned = nodeToAssign.currentlyAssigned + 1; + nodeToAssign.nonPreferred.add(replica); + nodeToAssign.newReplicas.add(replica); + } + } + } + /** * Adjust preference lists to reduce the number of same replicas on an instance. This will * separately normalize two sets of preference lists, and then append the results of the second http://git-wip-us.apache.org/repos/asf/helix/blob/8d409fc7/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java new file mode 100644 index 0000000..49ff753 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java @@ -0,0 +1,77 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestAutoRebalanceStrategyImbalanceAssignment { + private static final String resourceName = "ImbalanceResource"; + + @Test + public void testImbalanceAssignments() { + final int nReplicas = 5; + final int nPartitions = 20; + final int nNode = 10; + + // Test all the combination of partitions, replicas and nodes + for (int i = nPartitions; i > 0; i--) { + for (int j = nReplicas; j > 0; j--) { + for (int k = nNode; k > 0; k--) { + if (k >= j) { + testAssignment(i, j, k); + } + } + } + } + } + + private void testAssignment(int nPartitions, int nReplicas, int nNode) { + final List<String> instanceNames = new ArrayList<>(); + for (int i = 0; i < nNode; i++) { + instanceNames.add("localhost_" + i); + } + List<String> partitions = new ArrayList<>(nPartitions); + for (int i = 0; i < nPartitions; i++) { + partitions.add(Integer.toString(i)); + } + + LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2); + states.put("OFFLINE", 0); + states.put("ONLINE", nReplicas); + + AutoRebalanceStrategy strategy = new AutoRebalanceStrategy(resourceName, partitions, states); + ZNRecord record = strategy.computePartitionAssignment(instanceNames, instanceNames, + new HashMap<String, Map<String, String>>(0), new ClusterDataCache()); + + for (Map<String, String> stateMapping : record.getMapFields().values()) { + Assert.assertEquals(stateMapping.size(), nReplicas); + } + } +}
