Repository: helix Updated Branches: refs/heads/helix-0.6.x b2794e744 -> 5378afab5
[HELIX-541] Fix possible livelock in Helix controller, rb=28413 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5378afab Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5378afab Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5378afab Branch: refs/heads/helix-0.6.x Commit: 5378afab5646a6451665c1d8d3edb699daea5c64 Parents: b2794e7 Author: zzhang <[email protected]> Authored: Tue Nov 25 10:59:30 2014 -0800 Committer: zzhang <[email protected]> Committed: Tue Nov 25 10:59:30 2014 -0800 ---------------------------------------------------------------------- .../stages/CurrentStateComputationStage.java | 26 ++-- .../controller/stages/CurrentStateOutput.java | 56 ++++++-- .../stages/MessageGenerationPhase.java | 8 +- .../stages/MessageSelectionStage.java | 78 ++++------- .../controller/stages/TaskAssignmentStage.java | 5 - .../strategy/AutoRebalanceStrategy.java | 33 ++--- .../org/apache/helix/manager/zk/ZKUtil.java | 5 +- .../helix/task/FixedTargetTaskRebalancer.java | 5 +- .../org/apache/helix/task/TaskRebalancer.java | 15 ++- .../TestCurrentStateComputationStage.java | 2 +- .../stages/TestMsgSelectionStage.java | 32 ++++- .../integration/TestControllerLiveLock.java | 131 +++++++++++++++++++ 12 files changed, 268 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 6a30a9d..624698d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -70,8 +70,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { String partitionName = message.getPartitionName(); Partition partition = resource.getPartition(partitionName); if (partition != null) { - currentStateOutput.setPendingState(resourceName, partition, instanceName, - message.getToState()); + currentStateOutput.setPendingState(resourceName, partition, instanceName, message); } else { // log } @@ -81,8 +80,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { for (String partitionName : partitionNames) { Partition partition = resource.getPartition(partitionName); if (partition != null) { - currentStateOutput.setPendingState(resourceName, partition, instanceName, - message.getToState()); + currentStateOutput.setPendingState(resourceName, partition, instanceName, message); } else { // log } @@ -118,20 +116,12 @@ public class CurrentStateComputationStage extends AbstractBaseStage { for (String partitionName : partitionStateMap.keySet()) { Partition partition = resource.getPartition(partitionName); if (partition != null) { - currentStateOutput.setCurrentState(resourceName, - partition, - instanceName, - currentState.getState(partitionName)); - currentStateOutput.setRequestedState(resourceName, - partition, - instanceName, - currentState.getRequestedState(partitionName)); - currentStateOutput.setInfo(resourceName, - partition, - instanceName, - currentState.getInfo(partitionName)); - } else { - // log + currentStateOutput.setCurrentState(resourceName, partition, instanceName, + currentState.getState(partitionName)); + currentStateOutput.setRequestedState(resourceName, partition, instanceName, + currentState.getRequestedState(partitionName)); + currentStateOutput.setInfo(resourceName, partition, instanceName, + currentState.getInfo(partitionName)); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java index ac9d748..5633140 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java @@ -25,13 +25,20 @@ import java.util.Map; import java.util.Set; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import com.google.common.collect.Sets; +/** + * The current state includes both current state and pending messages + * For pending messages, we consider both toState and fromState + * Pending message prevents controller sending transitions that may potentially violate state + * constraints @see HELIX-541 + */ public class CurrentStateOutput { private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap; - private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap; + private final Map<String, Map<Partition, Map<String, Message>>> _pendingStateMap; // Contains per-resource maps of partition -> (instance, requested_state). This corresponds to the // REQUESTED_STATE // field in the CURRENTSTATES node. @@ -47,7 +54,7 @@ public class CurrentStateOutput { public CurrentStateOutput() { _currentStateMap = new HashMap<String, Map<Partition, Map<String, String>>>(); - _pendingStateMap = new HashMap<String, Map<Partition, Map<String, String>>>(); + _pendingStateMap = new HashMap<String, Map<Partition, Map<String, Message>>>(); _resourceStateModelMap = new HashMap<String, String>(); _curStateMetaMap = new HashMap<String, CurrentState>(); _requestedStateMap = new HashMap<String, Map<Partition, Map<String, String>>>(); @@ -114,14 +121,14 @@ public class CurrentStateOutput { } public void setPendingState(String resourceName, Partition partition, String instanceName, - String state) { + Message message) { if (!_pendingStateMap.containsKey(resourceName)) { - _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>()); + _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, Message>>()); } if (!_pendingStateMap.get(resourceName).containsKey(partition)) { - _pendingStateMap.get(resourceName).put(partition, new HashMap<String, String>()); + _pendingStateMap.get(resourceName).put(partition, new HashMap<String, Message>()); } - _pendingStateMap.get(resourceName).get(partition).put(instanceName, state); + _pendingStateMap.get(resourceName).get(partition).put(instanceName, message); } /** @@ -169,12 +176,12 @@ public class CurrentStateOutput { * @param resourceName * @param partition * @param instanceName - * @return + * @return pending message */ - public String getPendingState(String resourceName, Partition partition, String instanceName) { - Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName); + public Message getPendingState(String resourceName, Partition partition, String instanceName) { + Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName); if (map != null) { - Map<String, String> instanceStateMap = map.get(partition); + Map<String, Message> instanceStateMap = map.get(partition); if (instanceStateMap != null) { return instanceStateMap.get(instanceName); } @@ -199,14 +206,35 @@ public class CurrentStateOutput { } /** - * given (resource, partition), returns (instance->toState) map + * Given (resource, partition), returns (instance->toState) map * @param resourceName * @param partition - * @return + * @return pending target state map */ public Map<String, String> getPendingStateMap(String resourceName, Partition partition) { if (_pendingStateMap.containsKey(resourceName)) { - Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName); + Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName); + if (map.containsKey(partition)) { + Map<String, Message> pendingMsgMap = map.get(partition); + Map<String, String> pendingStateMap = new HashMap<String, String>(); + for (String instance : pendingMsgMap.keySet()) { + pendingStateMap.put(instance, pendingMsgMap.get(instance).getToState()); + } + return pendingStateMap; + } + } + return Collections.emptyMap(); + } + + /** + * Given (resource, partition), returns (instance->pendingMessage) map + * @param resourceName + * @param partition + * @return pending messages map + */ + public Map<String, Message> getPendingMessageMap(String resourceName, Partition partition) { + if (_pendingStateMap.containsKey(resourceName)) { + Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName); if (map.containsKey(partition)) { return map.get(partition); } @@ -221,7 +249,7 @@ public class CurrentStateOutput { */ public Set<Partition> getCurrentStateMappedPartitions(String resourceId) { Map<Partition, Map<String, String>> currentStateMap = _currentStateMap.get(resourceId); - Map<Partition, Map<String, String>> pendingStateMap = _pendingStateMap.get(resourceId); + Map<Partition, Map<String, Message>> pendingStateMap = _pendingStateMap.get(resourceId); Set<Partition> partitionSet = Sets.newHashSet(); if (currentStateMap != null) { partitionSet.addAll(currentStateMap.keySet()); http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index 92964e9..bc3c739 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -75,6 +75,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef()); for (Partition partition : resource.getPartitions()) { + Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(resourceName, partition); @@ -96,7 +97,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { continue; } - String pendingState = + Message pendingMessage = currentStateOutput.getPendingState(resourceName, partition, instanceName); String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState); @@ -107,7 +108,8 @@ public class MessageGenerationPhase extends AbstractBaseStage { continue; } - if (pendingState != null) { + if (pendingMessage != null) { + String pendingState = pendingMessage.getToState(); if (nextState.equalsIgnoreCase(pendingState)) { logger.debug("Message already exists for " + instanceName + " to transit " + partition.getPartitionName() + " from " + currentState + " to " + nextState); @@ -121,10 +123,12 @@ public class MessageGenerationPhase extends AbstractBaseStage { + pendingState + ", currentState: " + currentState + ", nextState: " + nextState); } } else { + Message message = createMessage(manager, resourceName, partition.getPartitionName(), instanceName, currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize); + IdealState idealState = cache.getIdealState(resourceName); if (idealState != null && idealState.getStateModelDefRef().equalsIgnoreCase( http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index f3a8257..0bc1905 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -97,13 +97,12 @@ public class MessageSelectionStage extends AbstractBaseStage { IdealState idealState = cache.getIdealState(resourceName); Map<String, Bounds> stateConstraints = computeStateConstraints(stateModelDef, idealState, cache); - for (Partition partition : resource.getPartitions()) { List<Message> messages = messageGenOutput.getMessages(resourceName, partition); List<Message> selectedMessages = selectMessages(cache.getLiveInstances(), currentStateOutput.getCurrentStateMap(resourceName, partition), - currentStateOutput.getPendingStateMap(resourceName, partition), messages, + currentStateOutput.getPendingMessageMap(resourceName, partition), messages, stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState()); output.addMessages(resourceName, partition, selectedMessages); } @@ -111,6 +110,18 @@ public class MessageSelectionStage extends AbstractBaseStage { event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output); } + private void increaseStateCnt(Map<String, Bounds> stateConstraints, String state, + Map<String, Integer> stateCnts) { + if (!stateConstraints.containsKey(state)) { + // skip state that doesn't have constraint + return; + } + if (!stateCnts.containsKey(state)) { + stateCnts.put(state, 0); + } + stateCnts.put(state, stateCnts.get(state) + 1); + } + // TODO: This method deserves its own class. The class should not understand helix but // just be // able to solve the problem using the algo. I think the method is following that but if @@ -131,15 +142,14 @@ public class MessageSelectionStage extends AbstractBaseStage { * @return: selected messages */ List<Message> selectMessages(Map<String, LiveInstance> liveInstances, - Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages, - Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities, - String initialState) { + Map<String, String> currentStates, Map<String, Message> pendingMessages, + List<Message> messages, Map<String, Bounds> stateConstraints, + final Map<String, Integer> stateTransitionPriorities, String initialState) { if (messages == null || messages.isEmpty()) { return Collections.emptyList(); } - List<Message> selectedMessages = new ArrayList<Message>(); - Map<String, Bounds> bounds = new HashMap<String, Bounds>(); + Map<String, Integer> stateCnts = new HashMap<String, Integer>(); // count currentState, if no currentState, count as in initialState for (String instance : liveInstances.keySet()) { @@ -148,21 +158,14 @@ public class MessageSelectionStage extends AbstractBaseStage { state = currentStates.get(instance); } - if (!bounds.containsKey(state)) { - bounds.put(state, new Bounds(0, 0)); - } - bounds.get(state).increaseLowerBound(); - bounds.get(state).increaseUpperBound(); + increaseStateCnt(stateConstraints, state, stateCnts); } // count pendingStates - for (String instance : pendingStates.keySet()) { - String state = pendingStates.get(instance); - if (!bounds.containsKey(state)) { - bounds.put(state, new Bounds(0, 0)); - } - // TODO: add lower bound, need to refactor pendingState to include fromState also - bounds.get(state).increaseUpperBound(); + for (String instance : pendingMessages.keySet()) { + Message message = pendingMessages.get(instance); + increaseStateCnt(stateConstraints, message.getToState(), stateCnts); + increaseStateCnt(stateConstraints, message.getFromState(), stateCnts); } // group messages based on state transition priority @@ -187,47 +190,19 @@ public class MessageSelectionStage extends AbstractBaseStage { // select messages for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) { for (Message message : messageList) { - String fromState = message.getFromState(); String toState = message.getToState(); - if (!bounds.containsKey(fromState)) { - LOG.error("Message's fromState is not in currentState. message: " + message); - continue; - } - - if (!bounds.containsKey(toState)) { - bounds.put(toState, new Bounds(0, 0)); - } - - // check lower bound of fromState - if (stateConstraints.containsKey(fromState)) { - int newLowerBound = bounds.get(fromState).getLowerBound() - 1; - if (newLowerBound < 0) { - LOG.error("Number of currentState in " + fromState - + " is less than number of messages transiting from " + fromState); - continue; - } - - if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) { - LOG.info("Reach lower_bound: " + stateConstraints.get(fromState).getLowerBound() - + ", not send message: " + message); - continue; - } - } - - // check upper bound of toState if (stateConstraints.containsKey(toState)) { - int newUpperBound = bounds.get(toState).getUpperBound() + 1; - if (newUpperBound > stateConstraints.get(toState).getUpperBound()) { + int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1); + if (newCnt > stateConstraints.get(toState).getUpperBound()) { LOG.info("Reach upper_bound: " + stateConstraints.get(toState).getUpperBound() + ", not send message: " + message); continue; } } + increaseStateCnt(stateConstraints, message.getToState(), stateCnts); selectedMessages.add(message); - bounds.get(fromState).increaseLowerBound(); - bounds.get(toState).increaseUpperBound(); } } @@ -253,8 +228,7 @@ public class MessageSelectionStage extends AbstractBaseStage { // idealState is null when resource has been dropped, // R can't be evaluated and ignore state constraints if (idealState != null) { - // HELIX-541: set upper_bound to R+1 to avoid live-lock - max = cache.getReplicas(idealState.getResourceName()) + 1; + max = cache.getReplicas(idealState.getResourceName()); } } else { try { http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index 5772385..1adcded 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -140,11 +140,6 @@ public class TaskAssignmentStage extends AbstractBaseStage { + " transit " + message.getPartitionName() + "|" + message.getPartitionNames() + " from:" + message.getFromState() + " to:" + message.getToState()); - // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " + - // message.getTgtName() - // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames() - // + " from: " + message.getFromState() + " to: " + message.getToState()); - keys.add(keyBuilder.message(message.getTgtName(), message.getId())); } http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java index 1e7f275..6578af0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java @@ -351,8 +351,7 @@ public class AutoRebalanceStrategy { int replicas = Math.min(countStateReplicas(), preferenceList.size()); // make this a LinkedHashSet to preserve iteration order - // truncate preference list to match replicas, @see HELIX-547 - Set<String> notAssigned = new LinkedHashSet<String>(preferenceList.subList(0, replicas)); + Set<String> notAssigned = new LinkedHashSet<String>(preferenceList); for (int i = 0; i < replicas; i++) { String state = _stateMap.get(i); String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts); @@ -438,21 +437,11 @@ public class AutoRebalanceStrategy { for (int replicaId = 0; replicaId < count; replicaId++) { Replica replica = new Replica(partition, replicaId); if (_preferredAssignment.get(replica).id != node.id - && !_existingPreferredAssignment.containsKey(replica)) { - if (!existingNonPreferredAssignment.containsKey(replica)) { - existingNonPreferredAssignment.put(replica, node); - node.nonPreferred.add(replica); - } else { - // if we have more than 1 existing non-preferred assignment, choose the node with more head-room - // this intends to make algorithm deterministic, @see HELIX-547 - Node curNode = existingNonPreferredAssignment.get(replica); - int curHeadroom = curNode.capacity - curNode.currentlyAssigned; - int newHeadroon = node.capacity - node.currentlyAssigned; - if (newHeadroon > curHeadroom) { - existingNonPreferredAssignment.put(replica, node); - node.nonPreferred.add(replica); - } - } + && !_existingPreferredAssignment.containsKey(replica) + && !existingNonPreferredAssignment.containsKey(replica)) { + existingNonPreferredAssignment.put(replica, node); + node.nonPreferred.add(replica); + break; } } @@ -586,11 +575,11 @@ public class AutoRebalanceStrategy { public int currentlyAssigned; public int capacity; public boolean hasCeilingCapacity; - private String id; + private final String id; boolean isAlive; - private List<Replica> preferred; - private List<Replica> nonPreferred; - private Set<Replica> newReplicas; + private final List<Replica> preferred; + private final List<Replica> nonPreferred; + private final Set<Replica> newReplicas; public Node(String id) { preferred = new ArrayList<Replica>(); @@ -696,7 +685,7 @@ public class AutoRebalanceStrategy { @Override public int compareTo(Replica that) { if (that instanceof Replica) { - return this.format.compareTo(((Replica) that).format); + return this.format.compareTo(that.format); } return -1; } http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java index 9018416..77c32af 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java @@ -20,6 +20,7 @@ package org.apache.helix.manager.zk; */ import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -68,7 +69,9 @@ public final class ZKUtil { for (String path : requiredPaths) { if (!zkClient.exists(path)) { isValid = false; - logger.info("Invalid cluster setup, missing znode path: " + path); + if (logger.isDebugEnabled()) { + logger.debug("Invalid cluster setup, missing znode path: " + path); + } } } return isValid; http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java index 53d2ee9..4c013c0 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java @@ -31,6 +31,7 @@ import java.util.TreeSet; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; @@ -140,10 +141,10 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { int pId = partitions.get(0); if (includeSet.contains(pId)) { for (String instance : instances) { - String pending = + Message pendingMessage = currStateOutput.getPendingState(tgtIs.getResourceName(), new Partition(pName), instance); - if (pending != null) { + if (pendingMessage != null) { continue; } String s = http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 131236e..a073b93 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -46,6 +46,7 @@ import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -262,9 +263,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { final String pName = pName(jobResource, pId); // Check for pending state transitions on this (partition, instance). - String pendingState = + Message pendingMessage = currStateOutput.getPendingState(jobResource, new Partition(pName), instance); - if (pendingState != null) { + if (pendingMessage != null) { // There is a pending state transition for this (partition, instance). Just copy forward // the state assignment from the previous ideal state. Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); @@ -272,10 +273,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { String prevState = stateMap.get(instance); paMap.put(pId, new PartitionAssignment(instance, prevState)); assignedPartitions.add(pId); - LOG.debug(String - .format( - "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", - pName, instance, prevState)); + if (LOG.isDebugEnabled()) { + LOG.debug(String + .format( + "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", + pName, instance, prevState)); + } } continue; http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java index 7687e18..c5f54a5 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java @@ -82,7 +82,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest { CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString()); String pendingState = output2.getPendingState("testResourceName", new Partition("testResourceName_1"), - "localhost_3"); + "localhost_3").getToState(); AssertJUnit.assertEquals(pendingState, "SLAVE"); ZNRecord record1 = new ZNRecord("testResourceName"); http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java index 820abbe..994e2fa 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java @@ -24,16 +24,38 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.helix.TestHelper; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageSelectionStage.Bounds; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageState; +import org.apache.helix.model.Message.MessageType; import org.testng.Assert; import org.testng.annotations.Test; public class TestMsgSelectionStage { + private Message newMessage(String resourceName, String partitionName, String instanceName, + String fromState, String toState) { + String uuid = UUID.randomUUID().toString(); + Message message = new Message(MessageType.STATE_TRANSITION, uuid); + message.setSrcName("controller"); + message.setTgtName(instanceName); + message.setMsgState(MessageState.NEW); + message.setResourceName(resourceName); + message.setPartitionName(partitionName); + message.setFromState(fromState); + message.setToState(toState); + message.setTgtSessionId("sessionId"); + message.setSrcSessionId("sessionId"); + message.setStateModelDef("MasterSlave"); + message.setStateModelFactoryName("DEFAULT"); + message.setBucketSize(0); + return message; + } + @Test public void testMasterXfer() { System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis())); @@ -46,7 +68,7 @@ public class TestMsgSelectionStage { currentStates.put("localhost_0", "SLAVE"); currentStates.put("localhost_1", "MASTER"); - Map<String, String> pendingStates = new HashMap<String, String>(); + Map<String, Message> pendingMessages = new HashMap<String, Message>(); List<Message> messages = new ArrayList<Message>(); messages.add(TestHelper.createMessage("msgId_0", "SLAVE", "MASTER", "localhost_0", "TestDB", @@ -63,7 +85,7 @@ public class TestMsgSelectionStage { stateTransitionPriorities.put("SLAVE-MASTER", 1); List<Message> selectedMsg = - new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates, + new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages, messages, stateConstraints, stateTransitionPriorities, "OFFLINE"); Assert.assertEquals(selectedMsg.size(), 1); @@ -84,8 +106,8 @@ public class TestMsgSelectionStage { currentStates.put("localhost_0", "SLAVE"); currentStates.put("localhost_1", "SLAVE"); - Map<String, String> pendingStates = new HashMap<String, String>(); - pendingStates.put("localhost_1", "MASTER"); + Map<String, Message> pendingMessages = new HashMap<String, Message>(); + pendingMessages.put("localhost_1", newMessage("TestDB", "TestDB_0", "localhost_1", "SLAVE", "MASTER")); List<Message> messages = new ArrayList<Message>(); messages.add(TestHelper.createMessage("msgId_0", "SLAVE", "MASTER", "localhost_0", "TestDB", @@ -100,7 +122,7 @@ public class TestMsgSelectionStage { stateTransitionPriorities.put("SLAVE-MASTER", 1); List<Message> selectedMsg = - new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates, + new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages, messages, stateConstraints, stateTransitionPriorities, "OFFLINE"); Assert.assertEquals(selectedMsg.size(), 0); http://git-wip-us.apache.org/repos/asf/helix/blob/5378afab/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java b/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java new file mode 100644 index 0000000..2240e3c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestControllerLiveLock.java @@ -0,0 +1,131 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.Map; +import java.util.Random; + +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.TestHelper.Verifier; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * This is for testing Helix controller livelock @see Helix-541 + * The test has a high probability to reproduce the problem + */ +public class TestControllerLiveLock extends ZkUnitTestBase { + private static final Logger LOG = Logger.getLogger(TestControllerLiveLock.class); + + @Test + public void test() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + int n = 12; + final int p = 256; + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor); + final PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + p, // partitions per resource + n, // number of nodes + 1, // replicas + "LeaderStandby", RebalanceMode.FULL_AUTO, true); // do rebalance + + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + + // start participants + Random random = new Random(); + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) { + String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + Thread.sleep(Math.abs(random.nextInt()) % 500 + 500); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // make sure all partitions are assigned and no partitions is assigned to STANDBY state + result = TestHelper.verify(new TestHelper.Verifier() { + + @Override + public boolean verify() throws Exception { + ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0")); + for (int i = 0; i < p; i++) { + String partition = "TestDB0_" + i; + Map<String, String> map = extView.getRecord().getMapField(partition); + if (map == null || map.size() != 1) { + return false; + } + } + return true; + } + }, 10 * 1000); + + if (!result) { + ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0")); + for (int i = 0; i < p; i++) { + String partition = "TestDB0_" + i; + Map<String, String> map = extView.getRecord().getMapField(partition); + if (map == null || map.size() != 1) { + LOG.error(partition + ": " + map); + } + } + } + Assert.assertTrue(result); + + // clean up + controller.syncStop(); + for (int i = 0; i < n; i++) { + participants[i].syncStop(); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +}
