Repository: helix Updated Branches: refs/heads/master 72d524847 -> 3c7e73417
[HELIX-692] use map instead of list to avoid deleting redundant message during cleanup Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3c7e7341 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3c7e7341 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3c7e7341 Branch: refs/heads/master Commit: 3c7e7341769c7602de7d1c8f266fffd66ab7a091 Parents: 72d5248 Author: Harry Zhang <zhan...@usc.edu> Authored: Fri Apr 6 16:27:02 2018 -0700 Committer: Harry Zhang <zhan...@usc.edu> Committed: Mon Apr 9 12:36:37 2018 -0700 ---------------------------------------------------------------------- .../controller/stages/MessageGenerationPhase.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/3c7e7341/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index a41c7b8..dd6ed6e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -64,7 +64,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); - Map<String, List<Message>> pendingMessagesToCleanUp = new HashMap<>(); + Map<String, Map<String, Message>> pendingMessagesToCleanUp = new HashMap<>(); CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); IntermediateStateOutput intermediateStateOutput = @@ -138,9 +138,10 @@ public class MessageGenerationPhase extends AbstractBaseStage { pendingMessage.getMsgId(), instanceName, pendingMessage.getFromState(), pendingMessage.getToState(), resourceName, partition, currentState); if (!pendingMessagesToCleanUp.containsKey(instanceName)) { - pendingMessagesToCleanUp.put(instanceName, new ArrayList<Message>()); + pendingMessagesToCleanUp.put(instanceName, new HashMap<String, Message>()); } - pendingMessagesToCleanUp.get(instanceName).add(pendingMessage); + pendingMessagesToCleanUp.get(instanceName) + .put(pendingMessage.getMsgId(), pendingMessage); } if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) { @@ -248,14 +249,14 @@ public class MessageGenerationPhase extends AbstractBaseStage { * @param accessor Data accessor used to clean up message */ private void schedulePendingMessageCleanUp( - final Map<String, List<Message>> pendingMessagesToPurge, ExecutorService workerPool, + final Map<String, Map<String, Message>> pendingMessagesToPurge, ExecutorService workerPool, final HelixDataAccessor accessor) { workerPool.submit(new Callable<Object>() { @Override public Object call() { - for (Map.Entry<String, List<Message>> entry : pendingMessagesToPurge.entrySet()) { + for (Map.Entry<String, Map<String, Message>> entry : pendingMessagesToPurge.entrySet()) { String instanceName = entry.getKey(); - for (Message msg : entry.getValue()) { + for (Message msg : entry.getValue().values()) { if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName))) { logger.info("Deleted message {} from instance {}", msg.getMsgId(), instanceName); } else {