http://git-wip-us.apache.org/repos/asf/helix/blob/9e8ec454/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java deleted file mode 100644 index 818d603..0000000 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java +++ /dev/null @@ -1,853 +0,0 @@ -package org.apache.helix.controller.strategy; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.helix.HelixDefinedState; -import org.apache.helix.Mocks.MockAccessor; -import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.ZNRecord; -import org.apache.helix.controller.rebalancer.AutoRebalancer; -import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.tools.StateModelConfigGenerator; -import org.apache.log4j.Logger; -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class TestAutoRebalanceStrategy { - private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class); - - /** - * Sanity test for a basic Master-Slave model - */ - @Test - public void simpleMasterSlaveTest() { - final int NUM_ITERATIONS = 10; - final int NUM_PARTITIONS = 10; - final int NUM_LIVE_NODES = 12; - final int NUM_TOTAL_NODES = 20; - final int MAX_PER_NODE = 5; - - final String[] STATE_NAMES = { - "MASTER", "SLAVE" - }; - final int[] STATE_COUNTS = { - 1, 2 - }; - - runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES, - MAX_PER_NODE, STATE_NAMES, STATE_COUNTS); - } - - /** - * Run a test for an arbitrary state model. - * @param name Name of the test state model - * @param numIterations Number of rebalance tasks to run - * @param numPartitions Number of partitions for the resource - * @param numLiveNodes Number of live nodes in the cluster - * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to - * numLiveNodes - * @param maxPerNode Maximum number of replicas a node can serve - * @param stateNames States ordered by preference - * @param stateCounts Number of replicas that should be in each state - */ - private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes, - int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) { - List<String> partitions = new ArrayList<String>(); - for (int i = 0; i < numPartitions; i++) { - partitions.add("p_" + i); - } - - List<String> liveNodes = new ArrayList<String>(); - List<String> allNodes = new ArrayList<String>(); - for (int i = 0; i < numTotalNodes; i++) { - allNodes.add("n_" + i); - if (i < numLiveNodes) { - liveNodes.add("n_" + i); - } - } - - Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>(); - - LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(); - for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) { - states.put(stateNames[i], stateCounts[i]); - } - - StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states); - - new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode, - stateModelDef).runRepeatedly(numIterations); - } - - /** - * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions - * into account when computing mappings, so this is acceptable. - * @param modelName name to give the model - * @param initialState initial state for all nodes - * @param states ordered map of state to count - * @return incomplete StateModelDefinition for rebalancing - */ - private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState, - LinkedHashMap<String, Integer> states) { - StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName); - builder.initialState(initialState); - int i = states.size(); - for (String state : states.keySet()) { - builder.addState(state, i); - builder.upperBound(state, states.get(state)); - i--; - } - return builder.build(); - } - - class AutoRebalanceTester { - private static final double P_KILL = 0.45; - private static final double P_ADD = 0.1; - private static final double P_RESURRECT = 0.45; - private static final String RESOURCE_NAME = "resource"; - - private List<String> _partitions; - private LinkedHashMap<String, Integer> _states; - private List<String> _liveNodes; - private Set<String> _liveSet; - private Set<String> _removedSet; - private Set<String> _nonLiveSet; - private Map<String, Map<String, String>> _currentMapping; - private List<String> _allNodes; - private int _maxPerNode; - private StateModelDefinition _stateModelDef; - private Random _random; - - public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states, - List<String> liveNodes, Map<String, Map<String, String>> currentMapping, - List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) { - _partitions = partitions; - _states = states; - _liveNodes = liveNodes; - _liveSet = new TreeSet<String>(); - for (String node : _liveNodes) { - _liveSet.add(node); - } - _removedSet = new TreeSet<String>(); - _nonLiveSet = new TreeSet<String>(); - _currentMapping = currentMapping; - _allNodes = allNodes; - for (String node : allNodes) { - if (!_liveSet.contains(node)) { - _nonLiveSet.add(node); - } - } - _maxPerNode = maxPerNode; - _stateModelDef = stateModelDef; - _random = new Random(); - } - - /** - * Repeatedly randomly select a task to run and report the result - * @param numIterations - * Number of random tasks to run in sequence - */ - public void runRepeatedly(int numIterations) { - logger.info("~~~~ Initial State ~~~~~"); - RebalanceStrategy strategy = - new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode); - ZNRecord initialResult = - strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); - _currentMapping = getMapping(initialResult.getListFields()); - logger.info(_currentMapping); - getRunResult(_currentMapping, initialResult.getListFields()); - for (int i = 0; i < numIterations; i++) { - logger.info("~~~~ Iteration " + i + " ~~~~~"); - ZNRecord znRecord = runOnceRandomly(); - if (znRecord != null) { - final Map<String, List<String>> listResult = znRecord.getListFields(); - final Map<String, Map<String, String>> mapResult = getMapping(listResult); - logger.info(mapResult); - logger.info(listResult); - getRunResult(mapResult, listResult); - _currentMapping = mapResult; - } - } - } - - private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) { - final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>(); - ClusterDataCache cache = new ClusterDataCache(); - MockAccessor accessor = new MockAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - for (String node : _liveNodes) { - LiveInstance liveInstance = new LiveInstance(node); - liveInstance.setSessionId("testSession"); - accessor.setProperty(keyBuilder.liveInstance(node), liveInstance); - } - cache.refresh(accessor); - for (String partition : _partitions) { - List<String> preferenceList = listResult.get(partition); - Map<String, String> currentStateMap = _currentMapping.get(partition); - Set<String> disabled = Collections.emptySet(); - Map<String, String> assignment = - ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef, - preferenceList, currentStateMap, disabled, true); - mapResult.put(partition, assignment); - } - return mapResult; - } - - /** - * Output various statistics and correctness check results - * @param mapFields - * The map-map assignment generated by the rebalancer - * @param listFields - * The map-list assignment generated by the rebalancer - */ - public void getRunResult(final Map<String, Map<String, String>> mapFields, - final Map<String, List<String>> listFields) { - logger.info("***** Statistics *****"); - dumpStatistics(mapFields); - verifyCorrectness(mapFields, listFields); - } - - /** - * Output statistics about the assignment - * @param mapFields - * The map-map assignment generated by the rebalancer - */ - public void dumpStatistics(final Map<String, Map<String, String>> mapFields) { - Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields); - int nodeCount = _liveNodes.size(); - logger.info("Total number of nodes: " + nodeCount); - logger.info("Nodes: " + _liveNodes); - int sumPartitions = getSum(partitionsPerNode.values()); - logger.info("Total number of partitions: " + sumPartitions); - double averagePartitions = getAverage(partitionsPerNode.values()); - logger.info("Average number of partitions per node: " + averagePartitions); - double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions); - logger.info("Standard deviation of partitions: " + stdevPartitions); - - // Statistics about each state - Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields); - for (String state : _states.keySet()) { - Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>(); - for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) { - Map<String, Integer> stateCounts = nodeStates.getValue(); - if (stateCounts.containsKey(state)) { - nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state)); - } else { - nodeStateCounts.put(nodeStates.getKey(), 0); - } - } - int sumStates = getSum(nodeStateCounts.values()); - logger.info("Total number of state " + state + ": " + sumStates); - double averageStates = getAverage(nodeStateCounts.values()); - logger.info("Average number of state " + state + " per node: " + averageStates); - double stdevStates = getStdev(nodeStateCounts.values(), averageStates); - logger.info("Standard deviation of state " + state + " per node: " + stdevStates); - } - } - - /** - * Run a set of correctness tests, reporting success or failure - * @param mapFields - * The map-map assignment generated by the rebalancer - * @param listFields - * The map-list assignment generated by the rebalancer - */ - public void verifyCorrectness(final Map<String, Map<String, String>> mapFields, - final Map<String, List<String>> listFields) { - final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields); - boolean maxConstraintMet = maxNotExceeded(partitionsPerNode); - assert maxConstraintMet : "Max per node constraint: FAIL"; - logger.info("Max per node constraint: PASS"); - - boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode); - assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL"; - logger.info("Only live nodes have partitions constraint: PASS"); - - boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields); - assert stateAssignmentPossible : "State replica constraint: FAIL"; - logger.info("State replica constraint: PASS"); - - boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields); - assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL"; - logger.info("Node uniqueness per partition constraint: PASS"); - } - - private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) { - for (String node : partitionsPerNode.keySet()) { - Integer value = partitionsPerNode.get(node); - if (value > _maxPerNode) { - logger.error("ERROR: Node " + node + " has " + value - + " partitions despite a maximum of " + _maxPerNode); - return false; - } - } - return true; - } - - private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) { - for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) { - boolean isLive = _liveSet.contains(nodeState.getKey()); - boolean isEmpty = nodeState.getValue() == 0; - if (!isLive && !isEmpty) { - logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has " - + nodeState.getValue() + " replicas!"); - return false; - } - } - return true; - } - - private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) { - for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) { - final Map<String, String> nodeMap = partitionEntry.getValue(); - final Map<String, Integer> stateCounts = new TreeMap<String, Integer>(); - for (String state : nodeMap.values()) { - if (!stateCounts.containsKey(state)) { - stateCounts.put(state, 1); - } else { - stateCounts.put(state, stateCounts.get(state) + 1); - } - } - for (String state : stateCounts.keySet()) { - if (state.equals(HelixDefinedState.DROPPED.toString())) { - continue; - } - int count = stateCounts.get(state); - int maximumCount = _states.get(state); - if (count > maximumCount) { - logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey() - + " has " + count + " replicas when " + maximumCount + " is allowed!"); - return false; - } - } - } - return true; - } - - private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) { - for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) { - Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue()); - int numUniques = nodeSet.size(); - int total = partitionEntry.getValue().size(); - if (numUniques < total) { - logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total - + " nodes, but only " + numUniques + " are unique!"); - return false; - } - } - return true; - } - - private double getAverage(final Collection<Integer> values) { - double sum = 0.0; - for (Integer value : values) { - sum += value; - } - if (values.size() != 0) { - return sum / values.size(); - } else { - return -1.0; - } - } - - private int getSum(final Collection<Integer> values) { - int sum = 0; - for (Integer value : values) { - sum += value; - } - return sum; - } - - private double getStdev(final Collection<Integer> values, double mean) { - double sum = 0.0; - for (Integer value : values) { - double deviation = mean - value; - sum += Math.pow(deviation, 2.0); - } - if (values.size() != 0) { - sum /= values.size(); - return Math.pow(sum, 0.5); - } else { - return -1.0; - } - } - - private Map<String, Integer> getPartitionBucketsForNode( - final Map<String, Map<String, String>> assignment) { - Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>(); - for (String node : _liveNodes) { - partitionsPerNode.put(node, 0); - } - for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) { - final Map<String, String> nodeMap = partitionEntry.getValue(); - for (String node : nodeMap.keySet()) { - String state = nodeMap.get(node); - if (state.equals(HelixDefinedState.DROPPED.toString())) { - continue; - } - // add 1 for every occurrence of a node - if (!partitionsPerNode.containsKey(node)) { - partitionsPerNode.put(node, 1); - } else { - partitionsPerNode.put(node, partitionsPerNode.get(node) + 1); - } - } - } - return partitionsPerNode; - } - - private Map<String, Map<String, Integer>> getStateBucketsForNode( - final Map<String, Map<String, String>> assignment) { - Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>(); - for (String n : _liveNodes) { - result.put(n, new TreeMap<String, Integer>()); - } - for (Map<String, String> nodeStateMap : assignment.values()) { - for (Entry<String, String> nodeState : nodeStateMap.entrySet()) { - if (!result.containsKey(nodeState.getKey())) { - result.put(nodeState.getKey(), new TreeMap<String, Integer>()); - } - Map<String, Integer> stateMap = result.get(nodeState.getKey()); - if (!stateMap.containsKey(nodeState.getValue())) { - stateMap.put(nodeState.getValue(), 1); - } else { - stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1); - } - } - } - return result; - } - - /** - * Randomly choose between killing, adding, or resurrecting a single node - * @return (Partition -> (Node -> State)) ZNRecord - */ - public ZNRecord runOnceRandomly() { - double choose = _random.nextDouble(); - ZNRecord result = null; - if (choose < P_KILL) { - result = removeSingleNode(null); - } else if (choose < P_KILL + P_ADD) { - result = addSingleNode(null); - } else if (choose < P_KILL + P_ADD + P_RESURRECT) { - result = resurrectSingleNode(null); - } - return result; - } - - /** - * Run rebalancer trying to add a never-live node - * @param node - * Optional String to add - * @return ZNRecord result returned by the rebalancer - */ - public ZNRecord addSingleNode(String node) { - logger.info("=================== add node ================="); - if (_nonLiveSet.size() == 0) { - logger.warn("Cannot add node because there are no nodes left to add."); - return null; - } - - // Get a random never-live node - if (node == null || !_nonLiveSet.contains(node)) { - node = getRandomSetElement(_nonLiveSet); - } - logger.info("Adding " + node); - _liveNodes.add(node); - _liveSet.add(node); - _nonLiveSet.remove(node); - - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode). - computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); - } - - /** - * Run rebalancer trying to remove a live node - * @param node - * Optional String to remove - * @return ZNRecord result returned by the rebalancer - */ - public ZNRecord removeSingleNode(String node) { - logger.info("=================== remove node ================="); - if (_liveSet.size() == 0) { - logger.warn("Cannot remove node because there are no nodes left to remove."); - return null; - } - - // Get a random never-live node - if (node == null || !_liveSet.contains(node)) { - node = getRandomSetElement(_liveSet); - } - logger.info("Removing " + node); - _removedSet.add(node); - _liveNodes.remove(node); - _liveSet.remove(node); - - // the rebalancer expects that the current mapping doesn't contain deleted - // nodes - for (Map<String, String> nodeMap : _currentMapping.values()) { - if (nodeMap.containsKey(node)) { - nodeMap.remove(node); - } - } - - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); - } - - /** - * Run rebalancer trying to add back a removed node - * @param node - * Optional String to resurrect - * @return ZNRecord result returned by the rebalancer - */ - public ZNRecord resurrectSingleNode(String node) { - logger.info("=================== resurrect node ================="); - if (_removedSet.size() == 0) { - logger.warn("Cannot remove node because there are no nodes left to resurrect."); - return null; - } - - // Get a random never-live node - if (node == null || !_removedSet.contains(node)) { - node = getRandomSetElement(_removedSet); - } - logger.info("Resurrecting " + node); - _removedSet.remove(node); - _liveNodes.add(node); - _liveSet.add(node); - - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); - } - - private <T> T getRandomSetElement(Set<T> source) { - int element = _random.nextInt(source.size()); - int i = 0; - for (T node : source) { - if (i == element) { - return node; - } - i++; - } - return null; - } - } - - /** - * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference - * lists should prefer nodes in the current mapping at all times, but when all nodes are in the - * current mapping, then it should distribute states as evenly as possible. - */ - @Test - public void testOrphansNotPreferred() { - final String RESOURCE_NAME = "resource"; - final String[] PARTITIONS = { - "resource_0", "resource_1", "resource_2" - }; - final StateModelDefinition STATE_MODEL = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); - final int REPLICA_COUNT = 2; - final String[] NODES = { - "n0", "n1", "n2" - }; - - // initial state, one node, no mapping - List<String> allNodes = Lists.newArrayList(NODES[0]); - List<String> liveNodes = Lists.newArrayList(NODES[0]); - Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); - for (String partition : PARTITIONS) { - currentMapping.put(partition, new HashMap<String, String>()); - } - - // make sure that when the first node joins, a single replica is assigned fairly - List<String> partitions = ImmutableList.copyOf(PARTITIONS); - LinkedHashMap<String, Integer> stateCount = - AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - ZNRecord znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - Map<String, List<String>> preferenceLists = znRecord.getListFields(); - for (String partition : currentMapping.keySet()) { - // make sure these are all MASTER - List<String> preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); - } - - // now assign a replica to the first node in the current mapping, and add a second node - allNodes.add(NODES[1]); - liveNodes.add(NODES[1]); - stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - for (String partition : PARTITIONS) { - currentMapping.get(partition).put(NODES[0], "MASTER"); - } - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - preferenceLists = znRecord.getListFields(); - for (String partition : currentMapping.keySet()) { - List<String> preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for " - + partition); - Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for " - + partition); - } - - // now set the current mapping to reflect this update and make sure that it distributes masters - for (String partition : PARTITIONS) { - currentMapping.get(partition).put(NODES[1], "SLAVE"); - } - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - preferenceLists = znRecord.getListFields(); - Set<String> firstNodes = Sets.newHashSet(); - for (String partition : currentMapping.keySet()) { - List<String> preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - firstNodes.add(preferenceList.get(0)); - } - Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); - - // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the - // new node is never the most preferred - allNodes.add(NODES[2]); - liveNodes.add(NODES[2]); - stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - - // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one - currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - preferenceLists = znRecord.getListFields(); - boolean newNodeUsed = false; - for (String partition : currentMapping.keySet()) { - List<String> preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - if (preferenceList.contains(NODES[2])) { - newNodeUsed = true; - Assert.assertEquals(preferenceList.get(1), NODES[2], - "newly added node not at preference list tail for " + partition); - } - } - Assert.assertTrue(newNodeUsed, "not using " + NODES[2]); - - // now remap this to take the new node into account, should go back to balancing masters, slaves - // evenly across all nodes - for (String partition : PARTITIONS) { - currentMapping.get(partition).clear(); - } - currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER"); - currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); - currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER"); - currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - preferenceLists = znRecord.getListFields(); - firstNodes.clear(); - Set<String> secondNodes = Sets.newHashSet(); - for (String partition : currentMapping.keySet()) { - List<String> preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - firstNodes.add(preferenceList.get(0)); - secondNodes.add(preferenceList.get(1)); - } - Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly"); - Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly"); - - // remove a node now, but use the current mapping with everything balanced just prior - liveNodes.remove(0); - stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - - // remove all references of n0 from the mapping, keep everything else in a legal state - for (String partition : PARTITIONS) { - currentMapping.get(partition).clear(); - } - currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); - currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - preferenceLists = znRecord.getListFields(); - for (String partition : currentMapping.keySet()) { - List<String> preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - Map<String, String> stateMap = currentMapping.get(partition); - for (String participant : stateMap.keySet()) { - Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for " - + partition); - } - for (String participant : preferenceList) { - if (!stateMap.containsKey(participant)) { - Assert.assertNotSame(preferenceList.get(0), participant, - "newly moved replica should not be master for " + partition); - } - } - } - - // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again - for (String partition : PARTITIONS) { - currentMapping.get(partition).clear(); - } - currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER"); - currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE"); - currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - preferenceLists = znRecord.getListFields(); - firstNodes.clear(); - for (String partition : currentMapping.keySet()) { - List<String> preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - firstNodes.add(preferenceList.get(0)); - } - Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); - } - - - @Test public void test() { - int nPartitions = 16; - final String resourceName = "something"; - final List<String> instanceNames = - Arrays.asList("node-1", "node-2", "node-3", "node-4"); // Initialize to 4 unique strings - - final int nReplicas = 3; - - List<String> partitions = new ArrayList<String>(nPartitions); - for (int i = 0; i < nPartitions; i++) { - partitions.add(Integer.toString(i)); - } - - LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(2); - states.put("OFFLINE", 0); - states.put("ONLINE", nReplicas); - - AutoRebalanceStrategy strategy = new AutoRebalanceStrategy(resourceName, partitions, states); - ZNRecord znRecord = strategy.computePartitionAssignment(instanceNames, instanceNames, - new HashMap<String, Map<String, String>>(0), null); - - for (List p : znRecord.getListFields().values()) { - Assert.assertEquals(p.size(), nReplicas); - } - } - - /** - * Tests the following scenario: there is only a single partition for a resource. Two nodes up, - * partition should - * be assigned to one of them. Take down that node, partition should move. Bring back up that - * node, partition should not move unnecessarily. - */ - @Test - public void testWontMoveSinglePartitionUnnecessarily() { - final String RESOURCE = "resource"; - final String partition = "resource_0"; - final StateModelDefinition STATE_MODEL = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()); - LinkedHashMap<String, Integer> stateCount = Maps.newLinkedHashMap(); - stateCount.put("ONLINE", 1); - final String[] NODES = {"n0", "n1"}; - - // initial state, one node, no mapping - List<String> allNodes = Lists.newArrayList(NODES); - List<String> liveNodes = Lists.newArrayList(NODES); - Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); - currentMapping.put(partition, new HashMap<String, String>()); - - // Both nodes there - List<String> partitions = Lists.newArrayList(partition); - Map<String, String> upperBounds = Maps.newHashMap(); - for (String state : STATE_MODEL.getStatesPriorityList()) { - upperBounds.put(state, STATE_MODEL.getNumInstancesPerState(state)); - } - - ZNRecord znRecord = - new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - Map<String, List<String>> preferenceLists = znRecord.getListFields(); - List<String> preferenceList = preferenceLists.get(partition.toString()); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); - String state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0)); - Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition); - String preferredNode = preferenceList.get(0); - String otherNode = preferredNode.equals(NODES[0]) ? NODES[1] : NODES[0]; - // ok, see what happens if we've got the partition on the other node (e.g. due to the preferred - // node being down). - currentMapping.get(partition).put(otherNode, state); - - znRecord = - new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); - - preferenceLists = znRecord.getListFields(); - preferenceList = preferenceLists.get(partition.toString()); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); - state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0)); - Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition); - String finalPreferredNode = preferenceList.get(0); - // finally, make sure we haven't moved it. - Assert.assertEquals(finalPreferredNode, otherNode); - } -}
http://git-wip-us.apache.org/repos/asf/helix/blob/9e8ec454/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java index 4266408..25b2d4b 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java +++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java @@ -33,7 +33,7 @@ import java.util.Map; */ public class TestTopology { - private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class); + private static Logger logger = Logger.getLogger(TestTopology.class); @Test public void testCreateClusterTopology() {