This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit be3dd97333fa36ddbd826e0326b33837e51abdf6 Author: Hunter Lee <[email protected]> AuthorDate: Thu Mar 28 12:24:44 2019 -0700 Fix N -> N + 1 extra bootstrap When rebalancing, Helix does the following before dropping replicas that are not in the preference list: 1. Send a replica on a node that just came online from OFFLINE to SLAVE 2. Drop a replica on a node that just bootstrapped This happened because of the rebalancer is trying to make all current states exactly match the states in the best possible mapping. Once conditions are met, Helix started dropping replicas. This fix makes Helix guarantee that only the replicas in the preference list match the states in the best possible mapping. So the rebalancer does not have to wait and bootstrap extra replicas that are not in preference list. --- .../rebalancer/DelayedAutoRebalancer.java | 15 ++- .../rebalancer/TestZeroReplicaAvoidance.java | 37 +++++- .../helix/integration/task/TestJobFailure.java | 1 + .../TestDelayedAutoRebalancer.MasterSlave.json | 137 +++++++++++++++++++++ 4 files changed, 179 insertions(+), 11 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 1d00d60..65b3f84 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -529,7 +529,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController // we should drop all partitions from previous assigned instances. if (!currentMapWithPreferenceList.values().contains(HelixDefinedState.ERROR.name()) && bestPossibleStateMap.size() > numReplicas && readyToDrop(currentStateMap, - bestPossibleStateMap, numReplicas, combinedPreferenceList)) { + bestPossibleStateMap, preferenceList, combinedPreferenceList)) { for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) { String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1); bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name()); @@ -551,13 +551,12 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController } private boolean readyToDrop(Map<String, String> currentStateMap, - Map<String, String> bestPossibleMap, int numReplicas, List<String> combinedPreferenceList) { - if (currentStateMap.size() != bestPossibleMap.size()) { - return false; - } - - for (int i = 0; i < numReplicas; i++) { - String instance = combinedPreferenceList.get(i); + Map<String, String> bestPossibleMap, List<String> preferenceList, + List<String> combinedPreferenceList) { + Set<String> preferenceWithActiveState = new HashSet<>(preferenceList); + preferenceWithActiveState.retainAll(combinedPreferenceList); + + for (String instance : preferenceWithActiveState) { if (!currentStateMap.containsKey(instance) || !currentStateMap.get(instance) .equals(bestPossibleMap.get(instance))) { return false; diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java index 05141e1..fe5014c 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java @@ -5,15 +5,20 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.UUID; +import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.BaseStageTest; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.StateModelDefinition; import org.codehaus.jackson.map.ObjectMapper; @@ -45,7 +50,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest { @Test(dataProvider = "zeroReplicaInput") public void testZeroReplicaAvoidanceDuringRebalance(StateModelDefinition stateModelDef, - List<String> instancePreferenceList, Map<String, String> currentStateMap, + List<String> instancePreferenceList, Map<String, String> currentStateMap, Map<String, List<Message>> pendingMessages, Map<String, String> expectedBestPossibleMap) { System.out.println("START TestDelayedAutoRebalancer at " + new Date(System.currentTimeMillis())); @@ -66,6 +71,18 @@ public class TestZeroReplicaAvoidance extends BaseStageTest { currentStateOutput .setCurrentState("test", partition, instance, currentStateMap.get(instance)); } + Set<String> allInstances = new HashSet<>(instancePreferenceList); + allInstances.addAll(currentStateMap.keySet()); + if (pendingMessages != null) { + for (String instance : allInstances) { + List<Message> messages = pendingMessages.get(instance); + if (messages != null) { + for (Message message : messages) { + currentStateOutput.setPendingMessage("test", partition, instance, message); + } + } + } + } Map<String, String> bestPossibleMap = rebalancer .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList, currentStateOutput, Collections.<String>emptySet(), is, @@ -93,6 +110,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest { private final String INPUT = "inputs"; private final String CURRENT_STATE = "currentStates"; + private final String PENDING_MESSAGES = "pendingMessages"; private final String BEST_POSSIBLE_STATE = "bestPossibleStates"; private final String PREFERENCE_LIST = "preferenceList"; private final String STATE_MODEL = "statemodel"; @@ -115,8 +133,21 @@ public class TestZeroReplicaAvoidance extends BaseStageTest { Map<String, String> bestPossibleStates = (Map<String, String>) inMap.get(BEST_POSSIBLE_STATE); List<String> preferenceList = (List<String>) inMap.get(PREFERENCE_LIST); - - ret.add(new Object[] { stateModelDef, preferenceList, currentStates, bestPossibleStates }); + Map<String, String> pendingStates = (Map<String, String>) inMap.get(PENDING_MESSAGES); + Map<String, List<Message>> pendingMessages = null; + if (pendingStates != null) { + Random r = new Random(); + pendingMessages = new HashMap<>(); + for (String instance : pendingStates.keySet()) { + pendingMessages.put(instance, new ArrayList<Message>()); + Message m = new Message(new ZNRecord(UUID.randomUUID().toString())); + m.setFromState(pendingStates.get(instance).split(":")[0]); + m.setToState(pendingStates.get(instance).split(":")[1]); + pendingMessages.get(instance).add(m); + } + } + + ret.add(new Object[] { stateModelDef, preferenceList, currentStates, pendingMessages, bestPossibleStates }); } } catch (IOException e) { e.printStackTrace(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java index 5309eb9..28d7f76 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java @@ -96,6 +96,7 @@ public final class TestJobFailure extends TaskSynchronizedTestBase { TaskState.valueOf(expectedJobEndingStates)); _driver.pollForWorkflowState(WORKFLOW_NAME, TaskState.valueOf(expectedWorkflowEndingStates)); + Thread.sleep(2000); JobContext jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW_NAME, JOB_NAME)); for (int pId : jobContext.getPartitionSet()) { Map<String, String> targetPartitionConfig = targetPartitionConfigs.get(jobContext.getTargetForPartition(pId)); diff --git a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json index aa2c98d..22bfbe3 100644 --- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json +++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json @@ -115,6 +115,143 @@ "localhost_3": "SLAVE", "localhost_4": "SLAVE" } + }, + { + "preferenceList": [ + "localhost_3", + "localhost_4", + "localhost_5" + ], + "currentStates": { + "localhost_6": "OFFLINE", + "localhost_3": "MASTER", + "localhost_0": "SLAVE", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "SLAVE", + "localhost_1": "SLAVE", + "localhost_2": "DROPPED", + "localhost_3": "MASTER", + "localhost_6": "DROPPED" + } + }, + { + "preferenceList": [ + "localhost_3", + "localhost_4", + "localhost_5" + ], + "currentStates": { + "localhost_6": "OFFLINE", + "localhost_3": "SLAVE", + "localhost_0": "SLAVE", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "SLAVE", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "MASTER" + } + }, + { + "preferenceList": [ + "localhost_3", + "localhost_4", + "localhost_5" + ], + "currentStates": { + "localhost_6": "OFFLINE", + "localhost_3": "OFFLINE", + "localhost_0": "SLAVE", + "localhost_1": "MASTER", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "SLAVE", + "localhost_1": "MASTER", + "localhost_2": "SLAVE", + "localhost_3": "SLAVE" + } + }, + { + "preferenceList": [ + "localhost_3", + "localhost_4", + "localhost_5" + ], + "currentStates": { + "localhost_6": "OFFLINE", + "localhost_3": "OFFLINE", + "localhost_1": "MASTER" + }, + "bestPossibleStates": { + "localhost_1": "MASTER", + "localhost_4": "SLAVE", + "localhost_3": "SLAVE" + } + }, + { + "preferenceList": [ + "localhost_3", + "localhost_4", + "localhost_5" + ], + "currentStates": { + "localhost_0": "SLAVE", + "localhost_1": "MASTER", + "localhost_2": "OFFLINE" + }, + "bestPossibleStates": { + "localhost_1": "MASTER", + "localhost_0": "SLAVE", + "localhost_3": "SLAVE", + "localhost_2": "SLAVE" + }, + "pendingMessages": { + "localhost_3": "OFFLINE:SLAVE" + } + }, + { + "preferenceList": [ + "localhost_3", + "localhost_4", + "localhost_5" + ], + "currentStates": { + "localhost_0": "SLAVE", + "localhost_1": "MASTER", + "localhost_2": "OFFLINE", + "localhost_3": "SLAVE" + }, + "bestPossibleStates": { + "localhost_1": "SLAVE", + "localhost_0": "SLAVE", + "localhost_3": "MASTER", + "localhost_2": "SLAVE" + } + }, + { + "preferenceList": [ + "localhost_3", + "localhost_4", + "localhost_5" + ], + "currentStates": { + "localhost_0": "SLAVE", + "localhost_1": "SLAVE", + "localhost_2": "OFFLINE", + "localhost_3": "MASTER" + }, + "bestPossibleStates": { + "localhost_1": "SLAVE", + "localhost_0": "SLAVE", + "localhost_3": "MASTER", + "localhost_2": "DROPPED" + } } ] }
