Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x b2794e744 -> 5378afab5


[HELIX-541] Fix possible livelock in Helix controller, rb=28413


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5378afab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5378afab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5378afab

Branch: refs/heads/helix-0.6.x
Commit: 5378afab5646a6451665c1d8d3edb699daea5c64
Parents: b2794e7
Author: zzhang <[email protected]>
Authored: Tue Nov 25 10:59:30 2014 -0800
Committer: zzhang <[email protected]>
Committed: Tue Nov 25 10:59:30 2014 -0800

----------------------------------------------------------------------
 .../stages/CurrentStateComputationStage.java    |  26 ++--
 .../controller/stages/CurrentStateOutput.java   |  56 ++++++--
 .../stages/MessageGenerationPhase.java          |   8 +-
 .../stages/MessageSelectionStage.java           |  78 ++++-------
 .../controller/stages/TaskAssignmentStage.java  |   5 -
 .../strategy/AutoRebalanceStrategy.java         |  33 ++---
 .../org/apache/helix/manager/zk/ZKUtil.java     |   5 +-
 .../helix/task/FixedTargetTaskRebalancer.java   |   5 +-
 .../org/apache/helix/task/TaskRebalancer.java   |  15 ++-
 .../TestCurrentStateComputationStage.java       |   2 +-
 .../stages/TestMsgSelectionStage.java           |  32 ++++-
 .../integration/TestControllerLiveLock.java     | 131 +++++++++++++++++++
 12 files changed, 268 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6a30a9d..624698d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -70,8 +70,7 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
           String partitionName = message.getPartitionName();
           Partition partition = resource.getPartition(partitionName);
           if (partition != null) {
-            currentStateOutput.setPendingState(resourceName, partition, 
instanceName,
-                message.getToState());
+            currentStateOutput.setPendingState(resourceName, partition, 
instanceName, message);
           } else {
             // log
           }
@@ -81,8 +80,7 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
             for (String partitionName : partitionNames) {
               Partition partition = resource.getPartition(partitionName);
               if (partition != null) {
-                currentStateOutput.setPendingState(resourceName, partition, 
instanceName,
-                    message.getToState());
+                currentStateOutput.setPendingState(resourceName, partition, 
instanceName, message);
               } else {
                 // log
               }
@@ -118,20 +116,12 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
         for (String partitionName : partitionStateMap.keySet()) {
           Partition partition = resource.getPartition(partitionName);
           if (partition != null) {
-            currentStateOutput.setCurrentState(resourceName,
-                                               partition,
-                                               instanceName,
-                                               
currentState.getState(partitionName));
-            currentStateOutput.setRequestedState(resourceName,
-                                                 partition,
-                                                 instanceName,
-                                                 
currentState.getRequestedState(partitionName));
-            currentStateOutput.setInfo(resourceName,
-                                       partition,
-                                       instanceName,
-                                       currentState.getInfo(partitionName));
-          } else {
-            // log
+            currentStateOutput.setCurrentState(resourceName, partition, 
instanceName,
+                currentState.getState(partitionName));
+            currentStateOutput.setRequestedState(resourceName, partition, 
instanceName,
+                currentState.getRequestedState(partitionName));
+            currentStateOutput.setInfo(resourceName, partition, instanceName,
+                currentState.getInfo(partitionName));
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index ac9d748..5633140 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -25,13 +25,20 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 
 import com.google.common.collect.Sets;
 
+/**
+ * The current state includes both current state and pending messages
+ * For pending messages, we consider both toState and fromState
+ * Pending message prevents controller sending transitions that may 
potentially violate state
+ * constraints @see HELIX-541
+ */
 public class CurrentStateOutput {
   private final Map<String, Map<Partition, Map<String, String>>> 
_currentStateMap;
-  private final Map<String, Map<Partition, Map<String, String>>> 
_pendingStateMap;
+  private final Map<String, Map<Partition, Map<String, Message>>> 
_pendingStateMap;
   // Contains per-resource maps of partition -> (instance, requested_state). 
This corresponds to the
   // REQUESTED_STATE
   // field in the CURRENTSTATES node.
@@ -47,7 +54,7 @@ public class CurrentStateOutput {
 
   public CurrentStateOutput() {
     _currentStateMap = new HashMap<String, Map<Partition, Map<String, 
String>>>();
-    _pendingStateMap = new HashMap<String, Map<Partition, Map<String, 
String>>>();
+    _pendingStateMap = new HashMap<String, Map<Partition, Map<String, 
Message>>>();
     _resourceStateModelMap = new HashMap<String, String>();
     _curStateMetaMap = new HashMap<String, CurrentState>();
     _requestedStateMap = new HashMap<String, Map<Partition, Map<String, 
String>>>();
@@ -114,14 +121,14 @@ public class CurrentStateOutput {
   }
 
   public void setPendingState(String resourceName, Partition partition, String 
instanceName,
-      String state) {
+      Message message) {
     if (!_pendingStateMap.containsKey(resourceName)) {
-      _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, 
String>>());
+      _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, 
Message>>());
     }
     if (!_pendingStateMap.get(resourceName).containsKey(partition)) {
-      _pendingStateMap.get(resourceName).put(partition, new HashMap<String, 
String>());
+      _pendingStateMap.get(resourceName).put(partition, new HashMap<String, 
Message>());
     }
-    _pendingStateMap.get(resourceName).get(partition).put(instanceName, state);
+    _pendingStateMap.get(resourceName).get(partition).put(instanceName, 
message);
   }
 
   /**
@@ -169,12 +176,12 @@ public class CurrentStateOutput {
    * @param resourceName
    * @param partition
    * @param instanceName
-   * @return
+   * @return pending message
    */
-  public String getPendingState(String resourceName, Partition partition, 
String instanceName) {
-    Map<Partition, Map<String, String>> map = 
_pendingStateMap.get(resourceName);
+  public Message getPendingState(String resourceName, Partition partition, 
String instanceName) {
+    Map<Partition, Map<String, Message>> map = 
_pendingStateMap.get(resourceName);
     if (map != null) {
-      Map<String, String> instanceStateMap = map.get(partition);
+      Map<String, Message> instanceStateMap = map.get(partition);
       if (instanceStateMap != null) {
         return instanceStateMap.get(instanceName);
       }
@@ -199,14 +206,35 @@ public class CurrentStateOutput {
   }
 
   /**
-   * given (resource, partition), returns (instance->toState) map
+   * Given (resource, partition), returns (instance->toState) map
    * @param resourceName
    * @param partition
-   * @return
+   * @return pending target state map
    */
   public Map<String, String> getPendingStateMap(String resourceName, Partition 
partition) {
     if (_pendingStateMap.containsKey(resourceName)) {
-      Map<Partition, Map<String, String>> map = 
_pendingStateMap.get(resourceName);
+      Map<Partition, Map<String, Message>> map = 
_pendingStateMap.get(resourceName);
+      if (map.containsKey(partition)) {
+        Map<String, Message> pendingMsgMap = map.get(partition);
+        Map<String, String> pendingStateMap = new HashMap<String, String>();
+        for (String instance : pendingMsgMap.keySet()) {
+          pendingStateMap.put(instance, 
pendingMsgMap.get(instance).getToState());
+        }
+        return pendingStateMap;
+      }
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Given (resource, partition), returns (instance->pendingMessage) map
+   * @param resourceName
+   * @param partition
+   * @return pending messages map
+   */
+  public Map<String, Message> getPendingMessageMap(String resourceName, 
Partition partition) {
+    if (_pendingStateMap.containsKey(resourceName)) {
+      Map<Partition, Map<String, Message>> map = 
_pendingStateMap.get(resourceName);
       if (map.containsKey(partition)) {
         return map.get(partition);
       }
@@ -221,7 +249,7 @@ public class CurrentStateOutput {
    */
   public Set<Partition> getCurrentStateMappedPartitions(String resourceId) {
     Map<Partition, Map<String, String>> currentStateMap = 
_currentStateMap.get(resourceId);
-    Map<Partition, Map<String, String>> pendingStateMap = 
_pendingStateMap.get(resourceId);
+    Map<Partition, Map<String, Message>> pendingStateMap = 
_pendingStateMap.get(resourceId);
     Set<Partition> partitionSet = Sets.newHashSet();
     if (currentStateMap != null) {
       partitionSet.addAll(currentStateMap.keySet());

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 92964e9..bc3c739 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -75,6 +75,7 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
       StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
 
       for (Partition partition : resource.getPartitions()) {
+
         Map<String, String> instanceStateMap =
             bestPossibleStateOutput.getInstanceStateMap(resourceName, 
partition);
 
@@ -96,7 +97,7 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
             continue;
           }
 
-          String pendingState =
+          Message pendingMessage =
               currentStateOutput.getPendingState(resourceName, partition, 
instanceName);
 
           String nextState = 
stateModelDef.getNextStateForTransition(currentState, desiredState);
@@ -107,7 +108,8 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
             continue;
           }
 
-          if (pendingState != null) {
+          if (pendingMessage != null) {
+            String pendingState = pendingMessage.getToState();
             if (nextState.equalsIgnoreCase(pendingState)) {
               logger.debug("Message already exists for " + instanceName + " to 
transit "
                   + partition.getPartitionName() + " from " + currentState + " 
to " + nextState);
@@ -121,10 +123,12 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
                   + pendingState + ", currentState: " + currentState + ", 
nextState: " + nextState);
             }
           } else {
+
             Message message =
                 createMessage(manager, resourceName, 
partition.getPartitionName(), instanceName,
                     currentState, nextState, sessionIdMap.get(instanceName), 
stateModelDef.getId(),
                     resource.getStateModelFactoryname(), bucketSize);
+
             IdealState idealState = cache.getIdealState(resourceName);
             if (idealState != null
                 && idealState.getStateModelDefRef().equalsIgnoreCase(

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index f3a8257..0bc1905 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -97,13 +97,12 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
       IdealState idealState = cache.getIdealState(resourceName);
       Map<String, Bounds> stateConstraints =
           computeStateConstraints(stateModelDef, idealState, cache);
-
       for (Partition partition : resource.getPartitions()) {
         List<Message> messages = messageGenOutput.getMessages(resourceName, 
partition);
         List<Message> selectedMessages =
             selectMessages(cache.getLiveInstances(),
                 currentStateOutput.getCurrentStateMap(resourceName, partition),
-                currentStateOutput.getPendingStateMap(resourceName, 
partition), messages,
+                currentStateOutput.getPendingMessageMap(resourceName, 
partition), messages,
                 stateConstraints, stateTransitionPriorities, 
stateModelDef.getInitialState());
         output.addMessages(resourceName, partition, selectedMessages);
       }
@@ -111,6 +110,18 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
   }
 
+  private void increaseStateCnt(Map<String, Bounds> stateConstraints, String 
state,
+      Map<String, Integer> stateCnts) {
+    if (!stateConstraints.containsKey(state)) {
+      // skip state that doesn't have constraint
+      return;
+    }
+    if (!stateCnts.containsKey(state)) {
+      stateCnts.put(state, 0);
+    }
+    stateCnts.put(state, stateCnts.get(state) + 1);
+  }
+
   // TODO: This method deserves its own class. The class should not understand 
helix but
   // just be
   // able to solve the problem using the algo. I think the method is following 
that but if
@@ -131,15 +142,14 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
    * @return: selected messages
    */
   List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
-      Map<String, String> currentStates, Map<String, String> pendingStates, 
List<Message> messages,
-      Map<String, Bounds> stateConstraints, final Map<String, Integer> 
stateTransitionPriorities,
-      String initialState) {
+      Map<String, String> currentStates, Map<String, Message> pendingMessages,
+      List<Message> messages, Map<String, Bounds> stateConstraints,
+      final Map<String, Integer> stateTransitionPriorities, String 
initialState) {
     if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
-
     List<Message> selectedMessages = new ArrayList<Message>();
-    Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+    Map<String, Integer> stateCnts = new HashMap<String, Integer>();
 
     // count currentState, if no currentState, count as in initialState
     for (String instance : liveInstances.keySet()) {
@@ -148,21 +158,14 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
         state = currentStates.get(instance);
       }
 
-      if (!bounds.containsKey(state)) {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      bounds.get(state).increaseLowerBound();
-      bounds.get(state).increaseUpperBound();
+      increaseStateCnt(stateConstraints, state, stateCnts);
     }
 
     // count pendingStates
-    for (String instance : pendingStates.keySet()) {
-      String state = pendingStates.get(instance);
-      if (!bounds.containsKey(state)) {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      // TODO: add lower bound, need to refactor pendingState to include 
fromState also
-      bounds.get(state).increaseUpperBound();
+    for (String instance : pendingMessages.keySet()) {
+      Message message = pendingMessages.get(instance);
+      increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
+      increaseStateCnt(stateConstraints, message.getFromState(), stateCnts);
     }
 
     // group messages based on state transition priority
@@ -187,47 +190,19 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
     // select messages
     for (List<Message> messageList : 
messagesGroupByStateTransitPriority.values()) {
       for (Message message : messageList) {
-        String fromState = message.getFromState();
         String toState = message.getToState();
 
-        if (!bounds.containsKey(fromState)) {
-          LOG.error("Message's fromState is not in currentState. message: " + 
message);
-          continue;
-        }
-
-        if (!bounds.containsKey(toState)) {
-          bounds.put(toState, new Bounds(0, 0));
-        }
-
-        // check lower bound of fromState
-        if (stateConstraints.containsKey(fromState)) {
-          int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
-          if (newLowerBound < 0) {
-            LOG.error("Number of currentState in " + fromState
-                + " is less than number of messages transiting from " + 
fromState);
-            continue;
-          }
-
-          if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) 
{
-            LOG.info("Reach lower_bound: " + 
stateConstraints.get(fromState).getLowerBound()
-                + ", not send message: " + message);
-            continue;
-          }
-        }
-
-        // check upper bound of toState
         if (stateConstraints.containsKey(toState)) {
-          int newUpperBound = bounds.get(toState).getUpperBound() + 1;
-          if (newUpperBound > stateConstraints.get(toState).getUpperBound()) {
+          int newCnt = (stateCnts.containsKey(toState) ? 
stateCnts.get(toState) + 1 : 1);
+          if (newCnt > stateConstraints.get(toState).getUpperBound()) {
             LOG.info("Reach upper_bound: " + 
stateConstraints.get(toState).getUpperBound()
                 + ", not send message: " + message);
             continue;
           }
         }
 
+        increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
         selectedMessages.add(message);
-        bounds.get(fromState).increaseLowerBound();
-        bounds.get(toState).increaseUpperBound();
       }
     }
 
@@ -253,8 +228,7 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
         // idealState is null when resource has been dropped,
         // R can't be evaluated and ignore state constraints
         if (idealState != null) {
-          // HELIX-541: set upper_bound to R+1 to avoid live-lock
-          max = cache.getReplicas(idealState.getResourceName()) + 1;
+          max = cache.getReplicas(idealState.getResourceName());
         }
       } else {
         try {

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 5772385..1adcded 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -140,11 +140,6 @@ public class TaskAssignmentStage extends AbstractBaseStage 
{
           + " transit " + message.getPartitionName() + "|" + 
message.getPartitionNames() + " from:"
           + message.getFromState() + " to:" + message.getToState());
 
-      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " 
to " +
-      // message.getTgtName()
-      // + " transit " + message.getPartitionName() + "|" + 
message.getPartitionNames()
-      // + " from: " + message.getFromState() + " to: " + 
message.getToState());
-
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 1e7f275..6578af0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -351,8 +351,7 @@ public class AutoRebalanceStrategy {
     int replicas = Math.min(countStateReplicas(), preferenceList.size());
 
     // make this a LinkedHashSet to preserve iteration order
-    // truncate preference list to match replicas, @see HELIX-547
-    Set<String> notAssigned = new 
LinkedHashSet<String>(preferenceList.subList(0, replicas));
+    Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
     for (int i = 0; i < replicas; i++) {
       String state = _stateMap.get(i);
       String node = getMinimumNodeForReplica(state, notAssigned, 
nodeReplicaCounts);
@@ -438,21 +437,11 @@ public class AutoRebalanceStrategy {
         for (int replicaId = 0; replicaId < count; replicaId++) {
           Replica replica = new Replica(partition, replicaId);
           if (_preferredAssignment.get(replica).id != node.id
-              && !_existingPreferredAssignment.containsKey(replica)) {
-            if (!existingNonPreferredAssignment.containsKey(replica)) {
-              existingNonPreferredAssignment.put(replica, node);
-              node.nonPreferred.add(replica);
-            } else {
-              // if we have more than 1 existing non-preferred assignment, 
choose the node with more head-room
-              // this intends to make algorithm deterministic, @see HELIX-547
-              Node curNode = existingNonPreferredAssignment.get(replica);
-              int curHeadroom = curNode.capacity - curNode.currentlyAssigned;
-              int newHeadroon = node.capacity - node.currentlyAssigned;
-              if (newHeadroon > curHeadroom) {
-                existingNonPreferredAssignment.put(replica, node);
-                node.nonPreferred.add(replica);
-              }
-            }
+              && !_existingPreferredAssignment.containsKey(replica)
+              && !existingNonPreferredAssignment.containsKey(replica)) {
+            existingNonPreferredAssignment.put(replica, node);
+            node.nonPreferred.add(replica);
+
             break;
           }
         }
@@ -586,11 +575,11 @@ public class AutoRebalanceStrategy {
     public int currentlyAssigned;
     public int capacity;
     public boolean hasCeilingCapacity;
-    private String id;
+    private final String id;
     boolean isAlive;
-    private List<Replica> preferred;
-    private List<Replica> nonPreferred;
-    private Set<Replica> newReplicas;
+    private final List<Replica> preferred;
+    private final List<Replica> nonPreferred;
+    private final Set<Replica> newReplicas;
 
     public Node(String id) {
       preferred = new ArrayList<Replica>();
@@ -696,7 +685,7 @@ public class AutoRebalanceStrategy {
     @Override
     public int compareTo(Replica that) {
       if (that instanceof Replica) {
-        return this.format.compareTo(((Replica) that).format);
+        return this.format.compareTo(that.format);
       }
       return -1;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 9018416..77c32af 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.manager.zk;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -68,7 +69,9 @@ public final class ZKUtil {
     for (String path : requiredPaths) {
       if (!zkClient.exists(path)) {
         isValid = false;
-        logger.info("Invalid cluster setup, missing znode path: " + path);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Invalid cluster setup, missing znode path: " + path);
+        }
       }
     }
     return isValid;

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index 53d2ee9..4c013c0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -31,6 +31,7 @@ import java.util.TreeSet;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 
@@ -140,10 +141,10 @@ public class FixedTargetTaskRebalancer extends 
TaskRebalancer {
       int pId = partitions.get(0);
       if (includeSet.contains(pId)) {
         for (String instance : instances) {
-          String pending =
+          Message pendingMessage =
               currStateOutput.getPendingState(tgtIs.getResourceName(), new 
Partition(pName),
                   instance);
-          if (pending != null) {
+          if (pendingMessage != null) {
             continue;
           }
           String s =

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 131236e..a073b93 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -46,6 +46,7 @@ import 
org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
@@ -262,9 +263,9 @@ public abstract class TaskRebalancer implements Rebalancer, 
MappingCalculator {
         final String pName = pName(jobResource, pId);
 
         // Check for pending state transitions on this (partition, instance).
-        String pendingState =
+        Message pendingMessage =
             currStateOutput.getPendingState(jobResource, new Partition(pName), 
instance);
-        if (pendingState != null) {
+        if (pendingMessage != null) {
           // There is a pending state transition for this (partition, 
instance). Just copy forward
           // the state assignment from the previous ideal state.
           Map<String, String> stateMap = prevAssignment.getReplicaMap(new 
Partition(pName));
@@ -272,10 +273,12 @@ public abstract class TaskRebalancer implements 
Rebalancer, MappingCalculator {
             String prevState = stateMap.get(instance);
             paMap.put(pId, new PartitionAssignment(instance, prevState));
             assignedPartitions.add(pId);
-            LOG.debug(String
-                .format(
-                    "Task partition %s has a pending state transition on 
instance %s. Using the previous ideal state which was %s.",
-                    pName, instance, prevState));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String
+                  .format(
+                      "Task partition %s has a pending state transition on 
instance %s. Using the previous ideal state which was %s.",
+                      pName, instance, prevState));
+            }
           }
 
           continue;

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 7687e18..c5f54a5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -82,7 +82,7 @@ public class TestCurrentStateComputationStage extends 
BaseStageTest {
     CurrentStateOutput output2 = 
event.getAttribute(AttributeName.CURRENT_STATE.toString());
     String pendingState =
         output2.getPendingState("testResourceName", new 
Partition("testResourceName_1"),
-            "localhost_3");
+            "localhost_3").getToState();
     AssertJUnit.assertEquals(pendingState, "SLAVE");
 
     ZNRecord record1 = new ZNRecord("testResourceName");

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 820abbe..994e2fa 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -24,16 +24,38 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestMsgSelectionStage {
+  private Message newMessage(String resourceName, String partitionName, String 
instanceName,
+      String fromState, String toState) {
+    String uuid = UUID.randomUUID().toString();
+    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+    message.setSrcName("controller");
+    message.setTgtName(instanceName);
+    message.setMsgState(MessageState.NEW);
+    message.setResourceName(resourceName);
+    message.setPartitionName(partitionName);
+    message.setFromState(fromState);
+    message.setToState(toState);
+    message.setTgtSessionId("sessionId");
+    message.setSrcSessionId("sessionId");
+    message.setStateModelDef("MasterSlave");
+    message.setStateModelFactoryName("DEFAULT");
+    message.setBucketSize(0);
+    return message;
+  }
+
   @Test
   public void testMasterXfer() {
     System.out.println("START testMasterXfer at " + new 
Date(System.currentTimeMillis()));
@@ -46,7 +68,7 @@ public class TestMsgSelectionStage {
     currentStates.put("localhost_0", "SLAVE");
     currentStates.put("localhost_1", "MASTER");
 
-    Map<String, String> pendingStates = new HashMap<String, String>();
+    Map<String, Message> pendingMessages = new HashMap<String, Message>();
 
     List<Message> messages = new ArrayList<Message>();
     messages.add(TestHelper.createMessage("msgId_0", "SLAVE", "MASTER", 
"localhost_0", "TestDB",
@@ -63,7 +85,7 @@ public class TestMsgSelectionStage {
     stateTransitionPriorities.put("SLAVE-MASTER", 1);
 
     List<Message> selectedMsg =
-        new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingStates,
+        new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingMessages,
             messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
 
     Assert.assertEquals(selectedMsg.size(), 1);
@@ -84,8 +106,8 @@ public class TestMsgSelectionStage {
     currentStates.put("localhost_0", "SLAVE");
     currentStates.put("localhost_1", "SLAVE");
 
-    Map<String, String> pendingStates = new HashMap<String, String>();
-    pendingStates.put("localhost_1", "MASTER");
+    Map<String, Message> pendingMessages = new HashMap<String, Message>();
+    pendingMessages.put("localhost_1", newMessage("TestDB", "TestDB_0", 
"localhost_1", "SLAVE", "MASTER"));
 
     List<Message> messages = new ArrayList<Message>();
     messages.add(TestHelper.createMessage("msgId_0", "SLAVE", "MASTER", 
"localhost_0", "TestDB",
@@ -100,7 +122,7 @@ public class TestMsgSelectionStage {
     stateTransitionPriorities.put("SLAVE-MASTER", 1);
 
     List<Message> selectedMsg =
-        new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingStates,
+        new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingMessages,
             messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
 
     Assert.assertEquals(selectedMsg.size(), 0);

http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java
new file mode 100644
index 0000000..2240e3c
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java
@@ -0,0 +1,131 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * This is for testing Helix controller livelock @see Helix-541
+ * The test has a high probability to reproduce the problem
+ */
+public class TestControllerLiveLock extends ZkUnitTestBase {
+  private static final Logger LOG = 
Logger.getLogger(TestControllerLiveLock.class);
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 12;
+    final int p = 256;
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
baseAccessor);
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        p, // partitions per resource
+        n, // number of nodes
+        1, // replicas
+        "LeaderStandby", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    Random random = new Random();
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i].syncStart();
+      Thread.sleep(Math.abs(random.nextInt()) % 500 + 500);
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // make sure all partitions are assigned and no partitions is assigned to 
STANDBY state
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        ExternalView extView = 
accessor.getProperty(keyBuilder.externalView("TestDB0"));
+        for (int i = 0; i < p; i++) {
+          String partition = "TestDB0_" + i;
+          Map<String, String> map = extView.getRecord().getMapField(partition);
+          if (map == null || map.size() != 1) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }, 10 * 1000);
+
+    if (!result) {
+      ExternalView extView = 
accessor.getProperty(keyBuilder.externalView("TestDB0"));
+      for (int i = 0; i < p; i++) {
+        String partition = "TestDB0_" + i;
+        Map<String, String> map = extView.getRecord().getMapField(partition);
+        if (map == null || map.size() != 1) {
+          LOG.error(partition + ": " + map);
+        }
+      }
+    }
+    Assert.assertTrue(result);
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+  }
+}

Reply via email to