Repository: helix Updated Branches: refs/heads/master e96ea8e20 -> dc9f129b6
HELIX-543 RB-27808 Avoid moving partitions unnecessarily when auto-rebalancing Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dc9f129b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dc9f129b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dc9f129b Branch: refs/heads/master Commit: dc9f129b67f8cacdf0cd22288f166b56fc5654a0 Parents: e96ea8e Author: Kishore Gopalakrishna <[email protected]> Authored: Wed Nov 12 00:23:03 2014 -0800 Committer: Kishore Gopalakrishna <[email protected]> Committed: Wed Nov 12 00:23:03 2014 -0800 ---------------------------------------------------------------------- helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy | 18 ++++ .../strategy/AutoRebalanceStrategy.java | 61 +++++++++-- .../strategy/TestAutoRebalanceStrategy.java | 62 +++++++++++ .../SinglePartitionLeaderStandByTest.java | 108 +++++++++++++++++++ 4 files changed, 242 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy ---------------------------------------------------------------------- diff --git a/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy index f59be07..ef1f57e 100644 --- a/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy +++ b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy @@ -1,4 +1,22 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> <ivy-module version="2.0" xmlns:m="http://ant.apache.org/ivy/maven"> <info organisation="org.apache.helix" module="helix-agent" http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java index 09b66c1..6e0e226 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java @@ -33,6 +33,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimaps; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.api.State; @@ -141,8 +144,12 @@ public class AutoRebalanceStrategy { final Map<PartitionId, Map<ParticipantId, State>> currentMapping, final List<ParticipantId> allNodes) { Comparator<ParticipantId> nodeComparator = new NodeComparator(); + Comparator<ParticipantId> currentStateNodeComparator = + new CurrentStateNodeComparator(currentMapping); + List<ParticipantId> sortedLiveNodes = new ArrayList<ParticipantId>(liveNodes); - Collections.sort(sortedLiveNodes, nodeComparator); + Collections.sort(sortedLiveNodes, currentStateNodeComparator); + List<ParticipantId> sortedAllNodes = new ArrayList<ParticipantId>(allNodes); Collections.sort(sortedAllNodes, nodeComparator); List<String> sortedNodeNames = @@ -481,7 +488,7 @@ public class AutoRebalanceStrategy { } /** - * Safe check for the number of replicas of a given id assiged to a node + * Safe check for the number of replicas of a given id assigned to a node * @param state the state to assign * @param node the node to check * @param nodeReplicaCounts a map of node to replica id and counts @@ -496,11 +503,11 @@ public class AutoRebalanceStrategy { return 0; } Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node); - if (!replicaCounts.containsKey(state)) { + if (!replicaCounts.containsKey(state.toString())) { replicaCounts.put(state.toString(), 0); return 0; } - return replicaCounts.get(state); + return replicaCounts.get(state.toString()); } /** @@ -609,7 +616,7 @@ public class AutoRebalanceStrategy { /** * Given a predefined set of all possible nodes, compute an assignment of replicas to * nodes that evenly assigns all replicas to nodes. - * @param allNodes Identifiers to all nodes, live and non-live + * @param nodeNames Identifiers to all nodes, live and non-live * @return Preferred assignment of replicas */ private Map<Replica, Node> computePreferredPlacement(final List<String> nodeNames) { @@ -633,8 +640,7 @@ public class AutoRebalanceStrategy { /** * Counts the total number of replicas given a state-count mapping - * @param states - * @return + * @return The number */ private int countStateReplicas() { int total = 0; @@ -844,4 +850,45 @@ public class AutoRebalanceStrategy { return o1.toString().compareTo(o2.toString()); } } + + /** + * Sorter for live nodes that sorts firstly according to the number of partitions currently + * registered against a node (more partitions means sort earlier), then by node name. + * This prevents unnecessarily moving partitions due to the capacity assignment + * unnecessarily reducing the capacity of lower down elements. + */ + private static class CurrentStateNodeComparator implements Comparator<ParticipantId> { + + /** + * The number of partitions that are active for each partition. + */ + private final Map<ParticipantId, Integer> partitionCounts; + + /** + * Create it. + * @param currentMapping The current mapping of partitions to participants. + */ + public CurrentStateNodeComparator(Map<PartitionId, Map<ParticipantId, State>> currentMapping) { + partitionCounts = new HashMap<ParticipantId, Integer>(); + for (Entry<PartitionId, Map<ParticipantId, State>> entry : currentMapping.entrySet()) { + for (ParticipantId participantId : entry.getValue().keySet()) { + Integer existing = partitionCounts.get(participantId); + partitionCounts.put(participantId, existing != null ? existing + 1 : 1); + } + } + } + + @Override + public int compare(ParticipantId o1, ParticipantId o2) { + Integer c1 = partitionCounts.get(o1); + if (c1 == null) { + c1 = 0; + } + Integer c2 = partitionCounts.get(o2); + if (c2 == null) { + c2 = 0; + } + return c1 < c2 ? 1 : (c1 > c2 ? -1 : o1.toString().compareTo(o2.toString())); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java index 1322b40..25c550d 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java @@ -788,4 +788,66 @@ public class TestAutoRebalanceStrategy { } Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); } + + /** + * Tests the following scenario: there is only a single partition for a resource. Two nodes up, + * partition should + * be assigned to one of them. Take down that node, partition should move. Bring back up that + * node, partition should not move unnecessarily. + */ + @Test + public void testWontMoveSinglePartitionUnnecessarily() { + final ResourceId RESOURCE = ResourceId.from("resource"); + final PartitionId partition = PartitionId.from("resource_0"); + final StateModelDefinition STATE_MODEL = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()); + LinkedHashMap<State, Integer> stateCount = Maps.newLinkedHashMap(); + stateCount.put(State.from("ONLINE"), 1); + final ParticipantId[] NODES = { + ParticipantId.from("n0"), ParticipantId.from("n1") + }; + + ReplicaPlacementScheme scheme = new AutoRebalanceStrategy.DefaultPlacementScheme(); + // initial state, one node, no mapping + List<ParticipantId> allNodes = Lists.newArrayList(NODES); + List<ParticipantId> liveNodes = Lists.newArrayList(NODES); + Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap(); + currentMapping.put(partition, new HashMap<ParticipantId, State>()); + + // Both nodes there + List<PartitionId> partitions = Lists.newArrayList(partition); + Map<State, String> upperBounds = Maps.newHashMap(); + for (State state : STATE_MODEL.getTypedStatesPriorityList()) { + upperBounds.put(state, STATE_MODEL.getNumParticipantsPerState(state)); + } + + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme) + .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes); + Map<String, List<String>> preferenceLists = znRecord.getListFields(); + List<String> preferenceList = preferenceLists.get(partition.toString()); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); + String state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0)); + Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition); + ParticipantId preferredNode = ParticipantId.from(preferenceList.get(0)); + ParticipantId otherNode = preferredNode.equals(NODES[0]) ? NODES[1] : NODES[0]; + // ok, see what happens if we've got the partition on the other node (e.g. due to the preferred + // node being down). + currentMapping.get(partition).put(otherNode, State.from(state)); + + znRecord = + new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme) + .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes); + + preferenceLists = znRecord.getListFields(); + preferenceList = preferenceLists.get(partition.toString()); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); + state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0)); + Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition); + ParticipantId finalPreferredNode = ParticipantId.from(preferenceList.get(0)); + // finally, make sure we haven't moved it. + Assert.assertEquals(finalPreferredNode, otherNode); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java new file mode 100644 index 0000000..ec8de0a --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java @@ -0,0 +1,108 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Arrays; +import java.util.Date; + +import org.apache.helix.HelixConstants; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.manager.zk.MockController; +import org.apache.helix.manager.zk.MockParticipant; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.testutil.TestUtil; +import org.apache.helix.testutil.ZkTestBase; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * This is a simple integration test. We will use this until we have framework + * which helps us write integration tests easily + */ + +public class SinglePartitionLeaderStandByTest extends ZkTestBase { + @Test + public void test() throws Exception { + String clusterName = TestUtil.getTestName(); + int n = 4; + + System.out.println("START " + clusterName +" at " + new Date(System.currentTimeMillis())); + + // Thread.currentThread().join(); + TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 2, // partitions per resource + n, // number of nodes + 0, // replicas + "LeaderStandby", + RebalanceMode.FULL_AUTO, + false); // dont rebalance + + // rebalance ideal-state to use ANY_LIVEINSTANCE for preference list + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, _baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + PropertyKey key = keyBuilder.idealStates("TestDB0"); + IdealState idealState = accessor.getProperty(key); + idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()); + idealState.getRecord().setListField("TestDB0_0", + Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())); + accessor.setProperty(key, idealState); + + MockController controller = + new MockController(_zkaddr, clusterName, "controller_0"); + controller.syncStart(); + + // start participants + MockParticipant[] participants = new MockParticipant[n]; + for (int i = 0; i < n; i++) { + String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName); + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, + clusterName)); + + Assert.assertTrue(result); + //stop the first participatn + participants[0].syncStop(); + Thread.sleep(10000); + for (int i = 0; i < 1; i++) { + String instanceName = "localhost_" + (12918 + i); + participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName); + participants[i].syncStart(); + } + // clean up + controller.syncStop(); + for (int i = 0; i < n; i++) { + participants[i].syncStop(); + } + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +}
