Fix P2P message logic in controller to avoid sending duplicated messages to participants.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/880f8851 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/880f8851 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/880f8851 Branch: refs/heads/master Commit: 880f885121afecab4e186282fbf94a146a2cf04a Parents: f1c5037 Author: Lei Xia <[email protected]> Authored: Tue Apr 24 18:18:40 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Mon Sep 17 15:08:28 2018 -0700 ---------------------------------------------------------------------- .../common/caches/InstanceMessagesCache.java | 146 +++++++-- .../controller/stages/ClusterDataCache.java | 12 +- .../stages/CurrentStateComputationStage.java | 57 +++- .../controller/stages/CurrentStateOutput.java | 58 ++-- .../stages/MessageGenerationPhase.java | 17 +- .../stages/MessageSelectionStage.java | 31 +- .../controller/stages/TaskAssignmentStage.java | 7 +- .../messaging/handling/HelixTaskExecutor.java | 22 +- .../java/org/apache/helix/model/Message.java | 12 +- .../helix/task/AbstractTaskDispatcher.java | 2 +- .../helix/task/DeprecatedTaskRebalancer.java | 2 +- .../FixedTargetTaskAssignmentCalculator.java | 4 +- .../org/apache/helix/task/JobRebalancer.java | 5 +- .../ZkHelixClusterVerifier.java | 2 +- .../org/apache/helix/common/ZkTestBase.java | 34 ++ .../TestCurrentStateComputationStage.java | 2 +- .../stages/TestMsgSelectionStage.java | 5 +- .../messaging/TestP2PMessageSemiAuto.java | 68 ++-- .../messaging/TestP2PNoDuplicatedMessage.java | 315 +++++++++++++++++++ .../PartitionMigration/TestExpandCluster.java | 2 - .../handling/MockHelixTaskExecutor.java | 111 +++++++ .../TestP2PMessagesAvoidDuplicatedMessage.java | 13 +- .../TestP2PStateTransitionMessages.java | 217 ++++++++++++- 23 files changed, 990 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java index f8001da..13b77cc 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java @@ -22,7 +22,7 @@ package org.apache.helix.common.caches; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -43,9 +43,16 @@ import org.slf4j.LoggerFactory; public class InstanceMessagesCache { private static final Logger LOG = LoggerFactory.getLogger(InstanceMessagesCache.class.getName()); private Map<String, Map<String, Message>> _messageMap; + private Map<String, Map<String, Message>> _relayMessageMap; // maintain a cache of participant messages across pipeline runs + // <instance -> {<MessageId, Message>}> private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap(); + + // maintain a set of valid pending P2P messages. + // <instance -> {<MessageId, Message>}> + private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap(); + private String _clusterName; public InstanceMessagesCache(String clusterName) { @@ -54,17 +61,15 @@ public class InstanceMessagesCache { /** * This refreshes all pending messages in the cluster by re-fetching the data from zookeeper in an - * efficient way - * current state must be refreshed before refreshing relay messages because we need to use current - * state to validate all relay messages. + * efficient way current state must be refreshed before refreshing relay messages because we need + * to use current state to validate all relay messages. * * @param accessor * @param liveInstanceMap * * @return */ - public boolean refresh(HelixDataAccessor accessor, - Map<String, LiveInstance> liveInstanceMap) { + public boolean refresh(HelixDataAccessor accessor, Map<String, LiveInstance> liveInstanceMap) { LOG.info("START: InstanceMessagesCache.refresh()"); long startTime = System.currentTimeMillis(); @@ -124,13 +129,16 @@ public class InstanceMessagesCache { System.currentTimeMillis() - startTime) + " ms."); } + LOG.info("END: InstanceMessagesCache.refresh()"); + return true; } // update all valid relay messages attached to existing state transition messages into message map. public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap, Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) { - List<Message> relayMessages = new ArrayList<>(); + + // refresh _relayMessageCache for (String instance : _messageMap.keySet()) { Map<String, Message> instanceMessages = _messageMap.get(instance); Map<String, Map<String, CurrentState>> instanceCurrentStateMap = @@ -170,51 +178,131 @@ public class InstanceMessagesCache { for (Message relayMsg : message.getRelayMessages().values()) { relayMsg.setRelayTime(transitionCompleteTime); - relayMessages.add(relayMsg); + cacheRelayMessage(relayMsg); } } } } - for (Message message : relayMessages) { - String instance = message.getTgtName(); - Map<String, Message> instanceMessages = _messageMap.get(instance); - if (instanceMessages == null) { - instanceMessages = new HashMap<>(); - _messageMap.put(instance, instanceMessages); + Map<String, Map<String, Message>> relayMessageMap = new HashMap<>(); + // refresh _relayMessageMap + for (String instance : _relayMessageCache.keySet()) { + Map<String, Message> messages = _relayMessageCache.get(instance); + Map<String, Map<String, CurrentState>> instanceCurrentStateMap = + currentStateMap.get(instance); + if (instanceCurrentStateMap == null) { + continue; + } + + Iterator<Map.Entry<String, Message>> iterator = messages.entrySet().iterator(); + while (iterator.hasNext()) { + Message message = iterator.next().getValue(); + String sessionId = message.getTgtSessionId(); + String resourceName = message.getResourceName(); + String partitionName = message.getPartitionName(); + String targetState = message.getToState(); + String instanceSessionId = liveInstanceMap.get(instance).getSessionId(); + + if (_messageMap.get(instance).containsKey(message.getMsgId())) { + // relay message has already been sent to target host + // remove the message from relayMessageCache. + LOG.info( + "Relay message has already been sent to target host, remove relay message from the cache" + + message.getId()); + iterator.remove(); + continue; + } + + if (!instanceSessionId.equals(sessionId)) { + LOG.info( + "Instance SessionId does not match, remove relay message from the cache" + message + .getId()); + iterator.remove(); + continue; + } + + Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); + if (sessionCurrentStateMap == null) { + LOG.info("No sessionCurrentStateMap found, ignore relay message from the cache" + message + .getId()); + continue; + } + CurrentState currentState = sessionCurrentStateMap.get(resourceName); + if (currentState != null && targetState.equals(currentState.getState(partitionName))) { + LOG.info("CurrentState " + currentState + + " match the target state of the relay message, remove relay from cache." + message + .getId()); + iterator.remove(); + continue; + } + + if (message.isExpired()) { + LOG.info("relay message " + message.getId() + " expired, remove it from cache." + + message.getId()); + iterator.remove(); + continue; + } + + if (!relayMessageMap.containsKey(instance)) { + relayMessageMap.put(instance, Maps.<String, Message>newHashMap()); + } + relayMessageMap.get(instance).put(message.getMsgId(), message); } - instanceMessages.put(message.getId(), message); } + + _relayMessageMap = Collections.unmodifiableMap(relayMessageMap); } /** - * Provides a list of current outstanding transitions on a given instance. + * Provides a list of current outstanding pending state transition messages on a given instance. * * @param instanceName * * @return */ public Map<String, Message> getMessages(String instanceName) { - Map<String, Message> map = _messageMap.get(instanceName); - if (map != null) { - return map; - } else { - return Collections.emptyMap(); + if (_messageMap.containsKey(instanceName)) { + return _messageMap.get(instanceName); } + return Collections.emptyMap(); } - public void cacheMessages(List<Message> messages) { + /** + * Provides a list of current outstanding pending relay (p2p) messages on a given instance. + * + * @param instanceName + * + * @return + */ + public Map<String, Message> getRelayMessages(String instanceName) { + if (_relayMessageMap.containsKey(instanceName)) { + return _relayMessageMap.get(instanceName); + } + return Collections.emptyMap(); + } + + public void cacheMessages(Collection<Message> messages) { for (Message message : messages) { String instanceName = message.getTgtName(); - Map<String, Message> instMsgMap; - if (_messageCache.containsKey(instanceName)) { - instMsgMap = _messageCache.get(instanceName); - } else { - instMsgMap = Maps.newHashMap(); - _messageCache.put(instanceName, instMsgMap); + if (!_messageCache.containsKey(instanceName)) { + _messageCache.put(instanceName, Maps.<String, Message>newHashMap()); } - instMsgMap.put(message.getId(), message); + _messageCache.get(instanceName).put(message.getId(), message); + + if (message.hasRelayMessages()) { + for (Message relayMsg : message.getRelayMessages().values()) { + cacheRelayMessage(relayMsg); + } + } + } + } + + protected void cacheRelayMessage(Message message) { + String instanceName = message.getTgtName(); + if (!_relayMessageCache.containsKey(instanceName)) { + _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap()); } + _relayMessageCache.get(instanceName).put(message.getId(), message); } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 3e6bd86..577b2c7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -20,6 +20,7 @@ package org.apache.helix.controller.stages; */ import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -476,7 +477,16 @@ public class ClusterDataCache { return _instanceMessagesCache.getMessages(instanceName); } - public void cacheMessages(List<Message> messages) { + /** + * Provides a list of current outstanding pending relay messages on a given instance. + * @param instanceName + * @return + */ + public Map<String, Message> getRelayMessages(String instanceName) { + return _instanceMessagesCache.getRelayMessages(instanceName); + } + + public void cacheMessages(Collection<Message> messages) { _instanceMessagesCache.cacheMessages(messages); } http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index a56d194..9cc9506 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -66,10 +66,12 @@ public class CurrentStateComputationStage extends AbstractBaseStage { // update pending messages Map<String, Message> messages = cache.getMessages(instanceName); - updatePendingMessages(instance, messages.values(), currentStateOutput, resourceMap); + Map<String, Message> relayMessages = cache.getRelayMessages(instanceName); + updatePendingMessages(instance, messages.values(), currentStateOutput, relayMessages.values(), resourceMap); // update current states. - Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName, instanceSessionId); + Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName, + instanceSessionId); updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap); } @@ -84,7 +86,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage { // update all pending messages to CurrentStateOutput. private void updatePendingMessages(LiveInstance instance, Collection<Message> pendingMessages, - CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap) { + CurrentStateOutput currentStateOutput, Collection<Message> pendingRelayMessages, + Map<String, Resource> resourceMap) { String instanceName = instance.getInstanceName(); String instanceSessionId = instance.getSessionId(); @@ -100,6 +103,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage { String resourceName = message.getResourceName(); Resource resource = resourceMap.get(resourceName); if (resource == null) { + LogUtil.logInfo(LOG, _eventId, String.format( + "Ignore a pending relay message %s for a non-exist resource %s and partition %s", + message.getMsgId(), resourceName, message.getPartitionName())); continue; } @@ -109,7 +115,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage { if (partition != null) { setMessageState(currentStateOutput, resourceName, partition, instanceName, message); } else { - // log + LogUtil.logInfo(LOG, _eventId, String + .format("Ignore a pending message %s for a non-exist resource %s and partition %s", + message.getMsgId(), resourceName, message.getPartitionName())); } } else { List<String> partitionNames = message.getPartitionNames(); @@ -119,12 +127,47 @@ public class CurrentStateComputationStage extends AbstractBaseStage { if (partition != null) { setMessageState(currentStateOutput, resourceName, partition, instanceName, message); } else { - // log + LogUtil.logInfo(LOG, _eventId, String.format( + "Ignore a pending message %s for a non-exist resource %s and partition %s", + message.getMsgId(), resourceName, message.getPartitionName())); } } } } } + + + // update all pending relay messages + for (Message message : pendingRelayMessages) { + if (!message.isRelayMessage()) { + LogUtil.logWarn(LOG, _eventId, + String.format("Not a relay message %s, ignored!", message.getMsgId())); + continue; + } + String resourceName = message.getResourceName(); + Resource resource = resourceMap.get(resourceName); + if (resource == null) { + LogUtil.logInfo(LOG, _eventId, String.format( + "Ignore a pending relay message %s for a non-exist resource %s and partition %s", + message.getMsgId(), resourceName, message.getPartitionName())); + continue; + } + + if (!message.getBatchMessageMode()) { + String partitionName = message.getPartitionName(); + Partition partition = resource.getPartition(partitionName); + if (partition != null) { + currentStateOutput.setPendingRelayMessage(resourceName, partition, instanceName, message); + } else { + LogUtil.logInfo(LOG, _eventId, String.format( + "Ignore a pending relay message %s for a non-exist resource %s and partition %s", + message.getMsgId(), resourceName, message.getPartitionName())); + } + } else { + LogUtil.logWarn(LOG, _eventId, String + .format("A relay message %s should not be batched, ignored!", message.getMsgId())); + } + } } // update current states in CurrentStateOutput @@ -169,9 +212,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage { private void setMessageState(CurrentStateOutput currentStateOutput, String resourceName, Partition partition, String instanceName, Message message) { if (MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) { - currentStateOutput.setPendingState(resourceName, partition, instanceName, message); + currentStateOutput.setPendingMessage(resourceName, partition, instanceName, message); } else { - currentStateOutput.setCancellationState(resourceName, partition, instanceName, message); + currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message); } } http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java index 4ebef97..b634703 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java @@ -37,8 +37,9 @@ import com.google.common.collect.Sets; */ public class CurrentStateOutput { private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap; - private final Map<String, Map<Partition, Map<String, Message>>> _pendingStateMap; - private final Map<String, Map<Partition, Map<String, Message>>> _cancellationStateMap; + private final Map<String, Map<Partition, Map<String, Message>>> _pendingMessageMap; + private final Map<String, Map<Partition, Map<String, Message>>> _cancellationMessageMap; + private final Map<String, Map<Partition, Map<String, Message>>> _pendingRelayMessageMap; // resourceName -> (Partition -> (instanceName -> endTime)) // Note that startTime / endTime in CurrentState marks that of state transition @@ -61,8 +62,9 @@ public class CurrentStateOutput { public CurrentStateOutput() { _currentStateMap = new HashMap<>(); - _pendingStateMap = new HashMap<>(); - _cancellationStateMap = new HashMap<>(); + _pendingMessageMap = new HashMap<>(); + _pendingRelayMessageMap = new HashMap<>(); + _cancellationMessageMap = new HashMap<>(); _currentStateEndTimeMap = new HashMap<>(); _resourceStateModelMap = new HashMap<>(); _curStateMetaMap = new HashMap<>(); @@ -140,9 +142,9 @@ public class CurrentStateOutput { _infoMap.get(resourceName).get(partition).put(instanceName, state); } - public void setPendingState(String resourceName, Partition partition, String instanceName, + public void setPendingMessage(String resourceName, Partition partition, String instanceName, Message message) { - setStateMessage(resourceName, partition, instanceName, message, _pendingStateMap); + setStateMessage(resourceName, partition, instanceName, message, _pendingMessageMap); } /** @@ -152,9 +154,14 @@ public class CurrentStateOutput { * @param instanceName * @param message */ - public void setCancellationState(String resourceName, Partition partition, String instanceName, + public void setCancellationMessage(String resourceName, Partition partition, String instanceName, Message message) { - setStateMessage(resourceName, partition, instanceName, message, _cancellationStateMap); + setStateMessage(resourceName, partition, instanceName, message, _cancellationMessageMap); + } + + public void setPendingRelayMessage(String resourceName, Partition partition, String instanceName, + Message message) { + setStateMessage(resourceName, partition, instanceName, message, _pendingRelayMessageMap); } private void setStateMessage(String resourceName, Partition partition, String instanceName, @@ -221,15 +228,24 @@ public class CurrentStateOutput { } /** - * given (resource, partition, instance), returns toState + * given (resource, partition, instance), returns pending message on this instance. * @param resourceName * @param partition * @param instanceName * @return pending message */ - // TODO: this should return toState, not pending message, create a separate method - public Message getPendingState(String resourceName, Partition partition, String instanceName) { - return getStateMessage(resourceName, partition, instanceName, _pendingStateMap); + public Message getPendingMessage(String resourceName, Partition partition, String instanceName) { + return getStateMessage(resourceName, partition, instanceName, _pendingMessageMap); + } + + public Map<String, Message> getPendingRelayMessageMap(String resourceName, Partition partition) { + if (_pendingRelayMessageMap.containsKey(resourceName)) { + Map<Partition, Map<String, Message>> map = _pendingRelayMessageMap.get(resourceName); + if (map.containsKey(partition)) { + return map.get(partition); + } + } + return Collections.emptyMap(); } /** @@ -239,9 +255,9 @@ public class CurrentStateOutput { * @param instanceName * @return */ - public Message getCancellationState(String resourceName, Partition partition, + public Message getCancellationMessage(String resourceName, Partition partition, String instanceName) { - return getStateMessage(resourceName, partition, instanceName, _cancellationStateMap); + return getStateMessage(resourceName, partition, instanceName, _cancellationMessageMap); } private Message getStateMessage(String resourceName, Partition partition, String instanceName, @@ -291,8 +307,8 @@ public class CurrentStateOutput { * @return pending target state map */ public Map<String, String> getPendingStateMap(String resourceName, Partition partition) { - if (_pendingStateMap.containsKey(resourceName)) { - Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName); + if (_pendingMessageMap.containsKey(resourceName)) { + Map<Partition, Map<String, Message>> map = _pendingMessageMap.get(resourceName); if (map.containsKey(partition)) { Map<String, Message> pendingMsgMap = map.get(partition); Map<String, String> pendingStateMap = new HashMap<String, String>(); @@ -312,8 +328,8 @@ public class CurrentStateOutput { * @return pending messages map */ public Map<String, Message> getPendingMessageMap(String resourceName, Partition partition) { - if (_pendingStateMap.containsKey(resourceName)) { - Map<Partition, Map<String, Message>> map = _pendingStateMap.get(resourceName); + if (_pendingMessageMap.containsKey(resourceName)) { + Map<Partition, Map<String, Message>> map = _pendingMessageMap.get(resourceName); if (map.containsKey(partition)) { return map.get(partition); } @@ -328,7 +344,7 @@ public class CurrentStateOutput { */ public Set<Partition> getCurrentStateMappedPartitions(String resourceId) { Map<Partition, Map<String, String>> currentStateMap = _currentStateMap.get(resourceId); - Map<Partition, Map<String, Message>> pendingStateMap = _pendingStateMap.get(resourceId); + Map<Partition, Map<String, Message>> pendingStateMap = _pendingMessageMap.get(resourceId); Set<Partition> partitionSet = Sets.newHashSet(); if (currentStateMap != null) { partitionSet.addAll(currentStateMap.keySet()); @@ -346,7 +362,7 @@ public class CurrentStateOutput { * @return set of participants to partitions mapping */ public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { - return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingStateMap); + return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingMessageMap); } /** @@ -397,7 +413,7 @@ public class CurrentStateOutput { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("current state= ").append(_currentStateMap); - sb.append(", pending state= ").append(_pendingStateMap); + sb.append(", pending state= ").append(_pendingMessageMap); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index e21c607..3abc965 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -82,7 +82,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { } Map<String, LiveInstance> liveInstances = cache.getLiveInstances(); - Map<String, String> sessionIdMap = new HashMap<String, String>(); + Map<String, String> sessionIdMap = new HashMap<>(); for (LiveInstance liveInstance : liveInstances.values()) { sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId()); @@ -101,7 +101,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { for (Partition partition : resource.getPartitions()) { - Map<String, String> instanceStateMap = new HashMap<String, String>( + Map<String, String> instanceStateMap = new HashMap<>( intermediateStateOutput.getInstanceStateMap(resourceName, partition)); Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName, partition); @@ -120,24 +120,27 @@ public class MessageGenerationPhase extends AbstractBaseStage { // we should generate message based on the desired-state priority // so keep generated messages in a temp map keyed by state // desired-state->list of generated-messages - Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>(); + Map<String, List<Message>> messageMap = new HashMap<>(); for (String instanceName : instanceStateMap.keySet()) { String desiredState = instanceStateMap.get(instanceName); - String currentState = currentStateOutput.getCurrentState(resourceName, partition, instanceName); + String currentState = currentStateOutput.getCurrentState(resourceName, partition, + instanceName); if (currentState == null) { currentState = stateModelDef.getInitialState(); } - Message pendingMessage = currentStateOutput.getPendingState(resourceName, partition, instanceName); + Message pendingMessage = currentStateOutput.getPendingMessage(resourceName, partition, + instanceName); boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled(); - Message cancellationMessage = currentStateOutput.getCancellationState(resourceName, partition, instanceName); + Message cancellationMessage = currentStateOutput.getCancellationMessage(resourceName, + partition, instanceName); String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState); Message message = null; - if (shouldCleanUpPendingMessage(pendingMessage, currentState, + if (pendingMessage != null && shouldCleanUpPendingMessage(pendingMessage, currentState, currentStateOutput.getEndTime(resourceName, partition, instanceName))) { LogUtil.logInfo(logger, _eventId, String.format( "Adding pending message %s on instance %s to clean up. Msg: %s->%s, current state of resource %s:%s is %s", http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index a061598..03838f4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -20,11 +20,13 @@ package org.apache.helix.controller.stages; */ import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.helix.controller.LogUtil; @@ -60,6 +62,7 @@ public class MessageSelectionStage extends AbstractBaseStage { public void process(ClusterEvent event) throws Exception { _eventId = event.getEventId(); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); + Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); @@ -86,6 +89,7 @@ public class MessageSelectionStage extends AbstractBaseStage { List<Message> selectedMessages = selectMessages(cache.getLiveInstances(), currentStateOutput.getCurrentStateMap(resourceName, partition), currentStateOutput.getPendingMessageMap(resourceName, partition), messages, + currentStateOutput.getPendingRelayMessageMap(resourceName, partition).values(), stateConstraints, stateTransitionPriorities, stateModelDef, resource.isP2PMessageEnabled()); output.addMessages(resourceName, partition, selectedMessages); @@ -127,9 +131,9 @@ public class MessageSelectionStage extends AbstractBaseStage { */ List<Message> selectMessages(Map<String, LiveInstance> liveInstances, Map<String, String> currentStates, Map<String, Message> pendingMessages, - List<Message> messages, Map<String, Bounds> stateConstraints, - final Map<String, Integer> stateTransitionPriorities, StateModelDefinition stateModelDef, - boolean p2pMessageEnabled) { + List<Message> messages, Collection<Message> pendingRelayMessages, + Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities, + StateModelDefinition stateModelDef, boolean p2pMessageEnabled) { if (messages == null || messages.isEmpty()) { return Collections.emptyList(); } @@ -188,6 +192,27 @@ public class MessageSelectionStage extends AbstractBaseStage { for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) { for (Message message : messageList) { String toState = message.getToState(); + String fromState = message.getFromState(); + + if (toState.equals(stateModelDef.getTopState())) { + // find if there are any pending relay messages match this message. + // if yes, rebuild the message to use the same message id from the original relay message. + for (Message relayMsg : pendingRelayMessages) { + if (relayMsg.getToState().equals(toState) && relayMsg.getFromState() + .equals(fromState)) { + if (relayMsg.getTgtName().equals(message.getTgtName())) { + message = new Message(message, relayMsg.getMsgId()); + } else { + // if there are pending relay message that was sent to a different host than the current message + // we should not send the toState message now. + LOG.info( + "There is pending relay message to a different host, not send message: {}, pending relay message: {}", + message, relayMsg); + continue; + } + } + } + } if (stateConstraints.containsKey(toState)) { int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1); http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index c196a26..6036f34 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -160,6 +160,11 @@ public class TaskAssignmentStage extends AbstractBaseStage { keys.add(keyBuilder.message(message.getTgtName(), message.getId())); } - dataAccessor.createChildren(keys, new ArrayList<>(messages)); + boolean[] results = dataAccessor.createChildren(keys, new ArrayList<>(messages)); + for (int i = 0; i < results.length; i++) { + if (!results[i]) { + LogUtil.logWarn(logger, _eventId, "Failed to send message: " + keys.get(i)); + } + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 9734cc8..925f127 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -488,8 +488,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { return true; } else { _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, - "fail to cancel task: " + taskId, - notificationContext.getManager()); + "fail to cancel task: " + taskId, notificationContext.getManager()); } } else { _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, @@ -789,14 +788,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { continue; } - if (message.isExpired()) { - LOG.info( - "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc() + " relayed from: " - + message.getRelaySrcHost()); - reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED); - continue; - } - String tgtSessionId = message.getTgtSessionId(); // sessionId mismatch normally means message comes from expired session, just remove it if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) { @@ -843,6 +834,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { continue; } + if (message.isExpired()) { + LOG.info( + "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc() + + " relayed from: " + message.getRelaySrcHost()); + reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED); + continue; + } + // State Transition Cancellation if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { boolean success = cancelNotStartedStateTransition(message, stateTransitionHandlers, accessor, instanceName); @@ -874,6 +873,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { duplicatedMessage.getToState(), message.getFromState(), message.getToState())); } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) && isStateTransitionInProgress(messageTarget)) { + // If there is another state transition for same partition is going on, // discard the message. Controller will resend if this is a valid message throw new HelixException(String @@ -1095,7 +1095,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { && stateTranstionMessage.getToState().equalsIgnoreCase(cancellationMessage.getToState()); } - private String getMessageTarget(String resourceName, String partitionName) { + String getMessageTarget(String resourceName, String partitionName) { return String.format("%s_%s", resourceName, partitionName); } http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/model/Message.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index 8195092..51d03cb 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -178,6 +178,16 @@ public class Message extends HelixProperty { } /** + * Instantiate a message with a new id + * @param message message to be copied + * @param id unique message identifier + */ + public Message(Message message, String id) { + super(new ZNRecord(message.getRecord(), id)); + setMsgId(id); + } + + /** * Set a subtype of the message * @param subType name of the subtype */ @@ -820,7 +830,7 @@ public class Message extends HelixProperty { // use relay time if this is a relay message if (isRelayMessage()) { long relayTime = getRelayTime(); - return relayTime <= 0 || (relayTime + expiry < current); + return (relayTime > 0 && (relayTime + expiry < current)); } return getCreateTimeStamp() + expiry < current; http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index eb67d59..e230fb5 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -65,7 +65,7 @@ public abstract class AbstractTaskDispatcher { // Check for pending state transitions on this (partition, instance). Message pendingMessage = - currStateOutput.getPendingState(jobResource, new Partition(pName), instance); + currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance); if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) { processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance, pendingMessage, jobState, currState, paMap, assignedPartitions); http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java index 97e02fc..9903117 100644 --- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java @@ -366,7 +366,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal // Check for pending state transitions on this (partition, instance). Message pendingMessage = - currStateOutput.getPendingState(jobResource, new Partition(pName), instance); + currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance); if (pendingMessage != null) { // There is a pending state transition for this (partition, instance). Just copy forward // the state assignment from the previous ideal state. http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java index 87b57c7..4389484 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java @@ -156,7 +156,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato int pId = partitions.get(0); if (includeSet.contains(pId)) { for (String instance : instances) { - Message pendingMessage = currStateOutput.getPendingState(tgtIs.getResourceName(), + Message pendingMessage = currStateOutput.getPendingMessage(tgtIs.getResourceName(), new Partition(pName), instance); if (pendingMessage != null) { continue; @@ -230,7 +230,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato // If there is, we should wait until the pending message gets processed, so skip // assignment this time around Message pendingMessage = - currStateOutput.getPendingState(targetIdealState.getResourceName(), + currStateOutput.getPendingMessage(targetIdealState.getResourceName(), new Partition(targetResourcePartitionName), instance); if (pendingMessage != null) { continue; http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index ddda41a..5b29c23 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -270,7 +270,7 @@ public class JobRebalancer extends TaskRebalancer { paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name())); } Partition partition = new Partition(pName(jobResource, pId)); - Message pendingMessage = currStateOutput.getPendingState(jobResource, partition, instance); + Message pendingMessage = currStateOutput.getPendingMessage(jobResource, partition, instance); // While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT, // so that Helix will cancel the transition. if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) { @@ -337,7 +337,8 @@ public class JobRebalancer extends TaskRebalancer { TaskPartitionState state = jobContext.getPartitionState(pId); Partition partition = new Partition(pName(jobResource, pId)); String instance = jobContext.getAssignedParticipant(pId); - Message pendingMessage = currentStateOutput.getPendingState(jobResource, partition, instance); + Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition, + instance); // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT && pendingMessage != null)) { http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java index 9d8e63d..dbf9272 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit; public abstract class ZkHelixClusterVerifier implements IZkChildListener, IZkDataListener, HelixClusterVerifier { private static Logger LOG = LoggerFactory.getLogger(ZkHelixClusterVerifier.class); - protected static int DEFAULT_TIMEOUT = 30 * 1000; + protected static int DEFAULT_TIMEOUT = 300 * 1000; protected static int DEFAULT_PERIOD = 100; http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 189e95c..c69744e 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -42,6 +42,7 @@ import org.apache.helix.PropertyPathBuilder; import org.apache.helix.SystemPropertyKeys; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; +import org.apache.helix.api.config.HelixConfigProperty; import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.Stage; @@ -65,6 +66,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.OnlineOfflineSMD; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.ConfigScopeBuilder; import org.apache.helix.tools.ClusterSetup; @@ -240,6 +242,38 @@ public class ZkTestBase { configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); } + protected void enableP2PInCluster(String clusterName, ConfigAccessor configAccessor, + boolean enable) { + // enable p2p message in cluster. + if (enable) { + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.enableP2PMessage(true); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } else { + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.getRecord().getSimpleFields() + .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + } + + protected void enableP2PInResource(String clusterName, ConfigAccessor configAccessor, + String dbName, boolean enable) { + if (enable) { + ResourceConfig resourceConfig = + new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build(); + configAccessor.setResourceConfig(clusterName, dbName, resourceConfig); + } else { + // remove P2P Message in resource config + ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, dbName); + if (resourceConfig != null) { + resourceConfig.getRecord().getSimpleFields() + .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); + configAccessor.setResourceConfig(clusterName, dbName, resourceConfig); + } + } + } + protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) { ConfigAccessor configAccessor = new ConfigAccessor(zkClient); ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java index 4479cf6..06fbf24 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java @@ -78,7 +78,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest { runStage(event, stage); CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.name()); String pendingState = - output2.getPendingState("testResourceName", new Partition("testResourceName_1"), + output2.getPendingMessage("testResourceName", new Partition("testResourceName_1"), "localhost_3").getToState(); AssertJUnit.assertEquals(pendingState, "SLAVE"); http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java index 79e4e2c..45e1062 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java @@ -20,6 +20,7 @@ package org.apache.helix.controller.stages; */ import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -85,7 +86,7 @@ public class TestMsgSelectionStage { List<Message> selectedMsg = new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages, - messages, stateConstraints, stateTransitionPriorities, + messages, Collections.<Message>emptyList(), stateConstraints, stateTransitionPriorities, BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false); Assert.assertEquals(selectedMsg.size(), 1); @@ -123,7 +124,7 @@ public class TestMsgSelectionStage { List<Message> selectedMsg = new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages, - messages, stateConstraints, stateTransitionPriorities, + messages, Collections.<Message>emptyList(), stateConstraints, stateTransitionPriorities, BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false); Assert.assertEquals(selectedMsg.size(), 0); http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java index 551ea78..3c95c2f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.api.config.HelixConfigProperty; import org.apache.helix.common.ZkTestBase; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.integration.DelayedTransitionBase; @@ -33,11 +32,9 @@ import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.model.BuiltInStateModelDefinitions; -import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.MasterSlaveSMD; -import org.apache.helix.model.ResourceConfig; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -55,7 +52,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase { static final String DB_NAME_1 = "TestDB_1"; static final String DB_NAME_2 = "TestDB_2"; - static final int PARTITION_NUMBER = 20; + static final int PARTITION_NUMBER = 200; static final int REPLICA_NUMBER = 3; List<MockParticipantManager> _participants = new ArrayList<>(); @@ -126,23 +123,25 @@ public class TestP2PMessageSemiAuto extends ZkTestBase { _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - verifyP2PMessage(DB_NAME_1,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); - verifyP2PMessage(DB_NAME_2,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + verifyP2PMessage(DB_NAME_1,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1); + verifyP2PMessage(DB_NAME_2,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1); //re-enable the old master _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); - verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), + _controller.getInstanceName(), 1); + verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), + _controller.getInstanceName(), 1); } @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"}) public void testP2PStateTransitionEnabledInCluster() { - enableP2PInCluster(true); - enableP2PInResource(DB_NAME_1,false); - enableP2PInResource(DB_NAME_2,false); + enableP2PInCluster(CLUSTER_NAME, _configAccessor, true); + enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,false); + enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false); // disable the master instance String prevMasterInstance = _instances.get(0); @@ -156,15 +155,17 @@ public class TestP2PMessageSemiAuto extends ZkTestBase { _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1)); - verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1)); + verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get( + 1)); + verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get( + 1)); } @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"}) public void testP2PStateTransitionEnabledInResource() { - enableP2PInCluster(false); - enableP2PInResource(DB_NAME_1,true); - enableP2PInResource(DB_NAME_2,false); + enableP2PInCluster(CLUSTER_NAME, _configAccessor, false); + enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_1,true); + enableP2PInResource(CLUSTER_NAME, _configAccessor, DB_NAME_2,false); // disable the master instance @@ -173,7 +174,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase { Assert.assertTrue(_clusterVerifier.verifyByPolling()); verifyP2PMessage(DB_NAME_1, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), prevMasterInstance); - verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1); //re-enable the old master @@ -181,37 +182,14 @@ public class TestP2PMessageSemiAuto extends ZkTestBase { Assert.assertTrue(_clusterVerifier.verifyByPolling()); verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1)); - verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); - } - - private void enableP2PInCluster(boolean enable) { - // enable p2p message in cluster. - if (enable) { - ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); - clusterConfig.enableP2PMessage(true); - _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - } else { - ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); - clusterConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); - _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - } + verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName(), 1); } - private void enableP2PInResource(String dbName, boolean enable) { - if (enable) { - ResourceConfig resourceConfig = new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build(); - _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig); - } else { - // remove P2P Message in resource config - ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, dbName); - if (resourceConfig != null) { - resourceConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); - _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig); - } - } + private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) { + verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 0.7); } - private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) { + private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost, double expectedRatio) { ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME); dataCache.refresh(_accessor); @@ -239,7 +217,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase { } double ratio = ((double) expectedHost) / ((double) total); - Assert.assertTrue(ratio >= 0.7, String + Assert.assertTrue(ratio >= expectedRatio, String .format("Only %d out of %d percent transitions to Master were triggered by expected host!", expectedHost, total)); } http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java new file mode 100644 index 0000000..b3ef3e5 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java @@ -0,0 +1,315 @@ +package org.apache.helix.integration.messaging; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.ClusterMessagingService; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.integration.DelayedTransitionBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.messaging.DefaultMessagingService; +import org.apache.helix.messaging.handling.HelixTaskExecutor; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.messaging.handling.MockHelixTaskExecutor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.monitoring.mbeans.MessageQueueMonitor; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestP2PNoDuplicatedMessage extends ZkTestBase { + private static Logger logger = LoggerFactory.getLogger(TestP2PNoDuplicatedMessage.class); + + final String CLASS_NAME = getShortClassName(); + final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + static final int PARTICIPANT_NUMBER = 6; + static final int PARTICIPANT_START_PORT = 12918; + + static final int DB_COUNT = 2; + + static final int PARTITION_NUMBER = 50; + static final int REPLICA_NUMBER = 3; + + final String _controllerName = CONTROLLER_PREFIX + "_0"; + + List<MockParticipantManager> _participants = new ArrayList<>(); + List<String> _instances = new ArrayList<>(); + ClusterControllerManager _controller; + + ZkHelixClusterVerifier _clusterVerifier; + ConfigAccessor _configAccessor; + HelixDataAccessor _accessor; + + @BeforeClass + public void beforeClass() + throws InterruptedException { + System.out.println( + "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis())); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance); + _instances.add(instance); + } + + // start dummy participants + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + MockParticipantManager participant = + new TestParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i)); + participant.setTransition(new DelayedTransitionBase(100)); + participant.syncStart(); + _participants.add(participant); + } + + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + for (int i = 0; i < DB_COUNT; i++) { + createResourceWithDelayedRebalance(CLUSTER_NAME, "TestDB_" + i, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, REPLICA_NUMBER, + REPLICA_NUMBER - 1, 1000000L, CrushEdRebalanceStrategy.class.getName()); + } + + // start controller + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, _controllerName); + _controller.syncStart(); + + _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + _configAccessor = new ConfigAccessor(_gZkClient); + _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (MockParticipantManager p : _participants) { + if (p.isConnected()) { + p.syncStop(); + } + } + deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testP2PStateTransitionDisabled() throws InterruptedException { + enableP2PInCluster(CLUSTER_NAME, _configAccessor, false); + + MockHelixTaskExecutor.resetStats(); + // rolling upgrade the cluster + for (String ins : _instances) { + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + verifyP2PDisabled(); + + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + verifyP2PDisabled(); + } + + Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0, + "There are duplicated transition messages sent while participant is handling the state-transition!"); + Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0, + "There are duplicated transition messages sent at same time!"); + } + + @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"}) + public void testP2PStateTransitionEnabled() throws InterruptedException { + enableP2PInCluster(CLUSTER_NAME, _configAccessor, true); + long startTime = System.currentTimeMillis(); + MockHelixTaskExecutor.resetStats(); + // rolling upgrade the cluster + for (String ins : _instances) { + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + verifyP2PEnabled(startTime); + + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + verifyP2PEnabled(startTime); + } + + double ratio = ((double) p2pTrigged) / ((double) total); + Assert.assertTrue(ratio > 0.7, String + .format("Only %d out of %d percent transitions to Master were triggered by expected host!", + p2pTrigged, total)); + + Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0, + "There are duplicated transition messages sent while participant is handling the state-transition!"); + Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0, + "There are duplicated transition messages sent at same time!"); + } + + private void verifyP2PDisabled() { + ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME); + dataCache.refresh(_accessor); + Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances(); + + for (LiveInstance instance : liveInstanceMap.values()) { + Map<String, CurrentState> currentStateMap = + dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId()); + Assert.assertNotNull(currentStateMap); + for (CurrentState currentState : currentStateMap.values()) { + for (String partition : currentState.getPartitionStateMap().keySet()) { + String state = currentState.getState(partition); + if (state.equalsIgnoreCase("MASTER")) { + String triggerHost = currentState.getTriggerHost(partition); + Assert.assertEquals(triggerHost, _controllerName, + state + " of " + partition + " on " + instance.getInstanceName() + + " was triggered by " + triggerHost); + } + } + } + } + } + + static int total = 0; + static int p2pTrigged = 0; + + private void verifyP2PEnabled(long startTime) { + ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME); + dataCache.refresh(_accessor); + Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances(); + + for (LiveInstance instance : liveInstanceMap.values()) { + Map<String, CurrentState> currentStateMap = + dataCache.getCurrentState(instance.getInstanceName(), instance.getSessionId()); + Assert.assertNotNull(currentStateMap); + for (CurrentState currentState : currentStateMap.values()) { + for (String partition : currentState.getPartitionStateMap().keySet()) { + String state = currentState.getState(partition); + long start = currentState.getStartTime(partition); + if (state.equalsIgnoreCase("MASTER") && start > startTime) { + String triggerHost = currentState.getTriggerHost(partition); + if (!triggerHost.equals(_controllerName)) { + p2pTrigged ++; + } + total ++; + } + } + } + } + } + + + static class TestParticipantManager extends MockParticipantManager { + private final DefaultMessagingService _messagingService; + + public TestParticipantManager(String zkAddr, String clusterName, String instanceName) { + super(zkAddr, clusterName, instanceName); + _messagingService = new MockMessagingService(this); + } + + @Override + public ClusterMessagingService getMessagingService() { + // The caller can register message handler factories on messaging service before the + // helix manager is connected. Thus we do not do connected check here. + return _messagingService; + } + } + + static class MockMessagingService extends DefaultMessagingService { + private final HelixTaskExecutor _taskExecutor; + ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded = + new ConcurrentHashMap<>(); + private final HelixManager _manager; + + public MockMessagingService(HelixManager manager) { + super(manager); + _manager = manager; + + boolean isParticipant = false; + if (manager.getInstanceType() == InstanceType.PARTICIPANT + || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) { + isParticipant = true; + } + + _taskExecutor = new MockHelixTaskExecutor( + new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()), + new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName())); + } + + @Override + public synchronized void registerMessageHandlerFactory(String type, + MessageHandlerFactory factory) { + registerMessageHandlerFactory(Collections.singletonList(type), factory); + } + + @Override + public synchronized void registerMessageHandlerFactory(List<String> types, + MessageHandlerFactory factory) { + if (_manager.isConnected()) { + for (String type : types) { + registerMessageHandlerFactoryExtended(type, factory); + } + } else { + for (String type : types) { + _messageHandlerFactoriestobeAdded.put(type, factory); + } + } + } + + public synchronized void onConnected() { + for (String type : _messageHandlerFactoriestobeAdded.keySet()) { + registerMessageHandlerFactoryExtended(type, _messageHandlerFactoriestobeAdded.get(type)); + } + _messageHandlerFactoriestobeAdded.clear(); + } + + public HelixTaskExecutor getExecutor() { + return _taskExecutor; + } + + + void registerMessageHandlerFactoryExtended(String type, MessageHandlerFactory factory) { + int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS; + _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize); + super.sendNopMessage(); + } + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java index 4083988..83893c6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java @@ -98,8 +98,6 @@ public class TestExpandCluster extends TestPartitionMigrationBase { Assert.assertTrue(_clusterVerifier.verifyByPolling()); Assert.assertFalse(_migrationVerifier.hasLessReplica()); - Assert.assertFalse(_migrationVerifier.hasMoreReplica()); - _migrationVerifier.stop(); } http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java new file mode 100644 index 0000000..207f74c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java @@ -0,0 +1,111 @@ +package org.apache.helix.messaging.handling; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.monitoring.mbeans.MessageQueueMonitor; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; + +public class MockHelixTaskExecutor extends HelixTaskExecutor { + public static int duplicatedMessages = 0; + public static int extraStateTransition = 0; + public static int duplicatedMessagesInProgress = 0; + HelixManager manager; + + public MockHelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor, + MessageQueueMonitor messageQueueMonitor) { + super(participantStatusMonitor, messageQueueMonitor); + } + + @Override + public void onMessage(String instanceName, List<Message> messages, + NotificationContext changeContext) { + manager = changeContext.getManager(); + checkDuplicatedMessages(messages); + super.onMessage(instanceName, messages, changeContext); + } + + void checkDuplicatedMessages(List<Message> messages) { + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), manager.getSessionId()); + Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path); + + Set<String> seenPartitions = new HashSet<>(); + for (Message message : messages) { + if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) { + String resource = message.getResourceName(); + String partition = message.getPartitionName(); + + //System.err.println(message.getMsgId()); + String key = resource + "-" + partition; + if (seenPartitions.contains(key)) { + //System.err.println("Duplicated message received for " + resource + ":" + partition); + duplicatedMessages++; + } + seenPartitions.add(key); + + String toState = message.getToState(); + String state = null; + if (currentStateMap.containsKey(resource)) { + CurrentState currentState = currentStateMap.get(resource); + state = currentState.getState(partition); + } + + if (toState.equals(state) && message.getMsgState() == Message.MessageState.NEW) { + // logger.error( + // "Extra message: " + message.getMsgId() + ", Partition is already in target state " + // + toState + " for " + resource + ":" + partition); + extraStateTransition++; + } + + String messageTarget = + getMessageTarget(message.getResourceName(), message.getPartitionName()); + + if (message.getMsgState() == Message.MessageState.NEW && + _messageTaskMap.containsKey(messageTarget)) { + String taskId = _messageTaskMap.get(messageTarget); + MessageTaskInfo messageTaskInfo = _taskMap.get(taskId); + Message existingMsg = messageTaskInfo.getTask().getMessage(); + if (existingMsg.getMsgId() != message.getMsgId()) + // logger.error("Duplicated message In Progress: " + message.getMsgId() + // + ", state transition in progress with message " + existingMsg.getMsgId() + // + " to " + toState + " for " + resource + ":" + partition); + duplicatedMessagesInProgress ++; + } + } + } + } + + public static void resetStats() { + duplicatedMessages = 0; + extraStateTransition = 0; + duplicatedMessagesInProgress = 0; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/880f8851/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java index d5e40be..bc1e554 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java @@ -32,6 +32,11 @@ import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.BestPossibleStateOutput; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.IntermediateStateOutput; import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageSelectionStageOutput; @@ -45,6 +50,7 @@ import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { @@ -156,7 +162,8 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { // Validate: Controller should not send S->M message to new master. currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE"); - currentStateOutput.setPendingState(_db, _partition, initialMaster, toSlaveMessage); + currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage); + currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); @@ -175,7 +182,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { currentStateOutput = populateCurrentStateFromBestPossible(_bestpossibleState); currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE"); - currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage); + currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); _fullPipeline.handle(event); @@ -221,7 +228,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { // The initial master has forwarded the p2p message to secondMaster and deleted original M->S message on initialMaster, // But the S->M state-transition has not completed yet in secondMaster. // Validate: Controller should not send S->M to thirdMaster. - currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage); + currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
