This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit be3dd97333fa36ddbd826e0326b33837e51abdf6
Author: Hunter Lee <[email protected]>
AuthorDate: Thu Mar 28 12:24:44 2019 -0700

    Fix N -> N + 1 extra bootstrap
    
    When rebalancing, Helix does the following before dropping replicas that 
are not in the preference list:
        1. Send a replica on a node that just came online from OFFLINE to SLAVE
        2. Drop a replica on a node that just bootstrapped
    
    This happened because of the rebalancer is trying to make all current 
states exactly match the states in the best possible mapping. Once conditions 
are met, Helix started dropping replicas. This fix makes Helix guarantee that 
only the replicas in the preference list match the states in the best possible 
mapping. So the rebalancer does not have to wait and bootstrap extra replicas 
that are not in preference list.
---
 .../rebalancer/DelayedAutoRebalancer.java          |  15 ++-
 .../rebalancer/TestZeroReplicaAvoidance.java       |  37 +++++-
 .../helix/integration/task/TestJobFailure.java     |   1 +
 .../TestDelayedAutoRebalancer.MasterSlave.json     | 137 +++++++++++++++++++++
 4 files changed, 179 insertions(+), 11 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 1d00d60..65b3f84 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -529,7 +529,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     // we should drop all partitions from previous assigned instances.
     if 
(!currentMapWithPreferenceList.values().contains(HelixDefinedState.ERROR.name())
         && bestPossibleStateMap.size() > numReplicas && 
readyToDrop(currentStateMap,
-        bestPossibleStateMap, numReplicas, combinedPreferenceList)) {
+        bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
       for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
         String instanceToDrop = 
combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
         bestPossibleStateMap.put(instanceToDrop, 
HelixDefinedState.DROPPED.name());
@@ -551,13 +551,12 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
   }
 
   private boolean readyToDrop(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, int numReplicas, List<String> 
combinedPreferenceList) {
-    if (currentStateMap.size() != bestPossibleMap.size()) {
-      return false;
-    }
-    
-    for (int i = 0; i < numReplicas; i++) {
-      String instance = combinedPreferenceList.get(i);
+      Map<String, String> bestPossibleMap, List<String> preferenceList,
+      List<String> combinedPreferenceList) {
+    Set<String> preferenceWithActiveState = new HashSet<>(preferenceList);
+    preferenceWithActiveState.retainAll(combinedPreferenceList);
+
+    for (String instance : preferenceWithActiveState) {
       if (!currentStateMap.containsKey(instance) || 
!currentStateMap.get(instance)
           .equals(bestPossibleMap.get(instance))) {
         return false;
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 05141e1..fe5014c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -5,15 +5,20 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -45,7 +50,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
 
   @Test(dataProvider = "zeroReplicaInput")
   public void testZeroReplicaAvoidanceDuringRebalance(StateModelDefinition 
stateModelDef,
-      List<String> instancePreferenceList, Map<String, String> currentStateMap,
+      List<String> instancePreferenceList, Map<String, String> 
currentStateMap, Map<String, List<Message>> pendingMessages,
       Map<String, String> expectedBestPossibleMap) {
     System.out.println("START TestDelayedAutoRebalancer at " + new 
Date(System.currentTimeMillis()));
 
@@ -66,6 +71,18 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
       currentStateOutput
           .setCurrentState("test", partition, instance, 
currentStateMap.get(instance));
     }
+    Set<String> allInstances = new HashSet<>(instancePreferenceList);
+    allInstances.addAll(currentStateMap.keySet());
+    if (pendingMessages != null) {
+      for (String instance : allInstances) {
+        List<Message> messages = pendingMessages.get(instance);
+        if (messages != null) {
+          for (Message message : messages) {
+            currentStateOutput.setPendingMessage("test", partition, instance, 
message);
+          }
+        }
+      }
+    }
     Map<String, String> bestPossibleMap = rebalancer
         .computeBestPossibleStateForPartition(liveInstances, stateModelDef, 
instancePreferenceList,
             currentStateOutput, Collections.<String>emptySet(), is,
@@ -93,6 +110,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
 
   private final String INPUT = "inputs";
   private final String CURRENT_STATE = "currentStates";
+  private final String PENDING_MESSAGES = "pendingMessages";
   private final String BEST_POSSIBLE_STATE = "bestPossibleStates";
   private final String PREFERENCE_LIST = "preferenceList";
   private final String STATE_MODEL = "statemodel";
@@ -115,8 +133,21 @@ public class TestZeroReplicaAvoidance extends 
BaseStageTest {
         Map<String, String> bestPossibleStates =
             (Map<String, String>) inMap.get(BEST_POSSIBLE_STATE);
         List<String> preferenceList = (List<String>) 
inMap.get(PREFERENCE_LIST);
-
-        ret.add(new Object[] { stateModelDef, preferenceList, currentStates, 
bestPossibleStates });
+        Map<String, String> pendingStates = (Map<String, String>) 
inMap.get(PENDING_MESSAGES);
+        Map<String, List<Message>> pendingMessages = null;
+        if (pendingStates != null) {
+          Random r = new Random();
+          pendingMessages = new HashMap<>();
+          for (String instance : pendingStates.keySet()) {
+            pendingMessages.put(instance, new ArrayList<Message>());
+            Message m = new Message(new 
ZNRecord(UUID.randomUUID().toString()));
+            m.setFromState(pendingStates.get(instance).split(":")[0]);
+            m.setToState(pendingStates.get(instance).split(":")[1]);
+            pendingMessages.get(instance).add(m);
+          }
+        }
+
+        ret.add(new Object[] { stateModelDef, preferenceList, currentStates, 
pendingMessages, bestPossibleStates });
       }
     } catch (IOException e) {
       e.printStackTrace();
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
index 5309eb9..28d7f76 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
@@ -96,6 +96,7 @@ public final class TestJobFailure extends 
TaskSynchronizedTestBase {
         TaskState.valueOf(expectedJobEndingStates));
     _driver.pollForWorkflowState(WORKFLOW_NAME, 
TaskState.valueOf(expectedWorkflowEndingStates));
 
+    Thread.sleep(2000);
     JobContext jobContext = 
_driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW_NAME, JOB_NAME));
     for (int pId : jobContext.getPartitionSet()) {
       Map<String, String> targetPartitionConfig = 
targetPartitionConfigs.get(jobContext.getTargetForPartition(pId));
diff --git 
a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json 
b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
index aa2c98d..22bfbe3 100644
--- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
+++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
@@ -115,6 +115,143 @@
         "localhost_3": "SLAVE",
         "localhost_4": "SLAVE"
       }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "MASTER",
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "DROPPED",
+        "localhost_3": "MASTER",
+        "localhost_6": "DROPPED"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "SLAVE",
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "SLAVE",
+        "localhost_3": "MASTER"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "OFFLINE",
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "SLAVE",
+        "localhost_3": "SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "OFFLINE",
+        "localhost_1": "MASTER"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "MASTER",
+        "localhost_4": "SLAVE",
+        "localhost_3": "SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "OFFLINE"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "MASTER",
+        "localhost_0": "SLAVE",
+        "localhost_3": "SLAVE",
+        "localhost_2": "SLAVE"
+      },
+      "pendingMessages": {
+        "localhost_3": "OFFLINE:SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "OFFLINE",
+        "localhost_3": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "SLAVE",
+        "localhost_0": "SLAVE",
+        "localhost_3": "MASTER",
+        "localhost_2": "SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "OFFLINE",
+        "localhost_3": "MASTER"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "SLAVE",
+        "localhost_0": "SLAVE",
+        "localhost_3": "MASTER",
+        "localhost_2": "DROPPED"
+      }
     }
   ]
 }

Reply via email to