Repository: helix
Updated Branches:
  refs/heads/master 72d524847 -> 3c7e73417


[HELIX-692] use map instead of list to avoid deleting redundant message during 
cleanup


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3c7e7341
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3c7e7341
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3c7e7341

Branch: refs/heads/master
Commit: 3c7e7341769c7602de7d1c8f266fffd66ab7a091
Parents: 72d5248
Author: Harry Zhang <zhan...@usc.edu>
Authored: Fri Apr 6 16:27:02 2018 -0700
Committer: Harry Zhang <zhan...@usc.edu>
Committed: Mon Apr 9 12:36:37 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/MessageGenerationPhase.java      | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3c7e7341/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index a41c7b8..dd6ed6e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -64,7 +64,7 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
-    Map<String, List<Message>> pendingMessagesToCleanUp = new HashMap<>();
+    Map<String, Map<String, Message>> pendingMessagesToCleanUp = new 
HashMap<>();
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.name());
     IntermediateStateOutput intermediateStateOutput =
@@ -138,9 +138,10 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
                 pendingMessage.getMsgId(), instanceName, 
pendingMessage.getFromState(),
                 pendingMessage.getToState(), resourceName, partition, 
currentState);
             if (!pendingMessagesToCleanUp.containsKey(instanceName)) {
-              pendingMessagesToCleanUp.put(instanceName, new 
ArrayList<Message>());
+                pendingMessagesToCleanUp.put(instanceName, new HashMap<String, 
Message>());
             }
-            pendingMessagesToCleanUp.get(instanceName).add(pendingMessage);
+            pendingMessagesToCleanUp.get(instanceName)
+                .put(pendingMessage.getMsgId(), pendingMessage);
           }
 
           if (desiredState.equals(NO_DESIRED_STATE) || 
desiredState.equalsIgnoreCase(currentState)) {
@@ -248,14 +249,14 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
    * @param accessor Data accessor used to clean up message
    */
   private void schedulePendingMessageCleanUp(
-      final Map<String, List<Message>> pendingMessagesToPurge, ExecutorService 
workerPool,
+      final Map<String, Map<String, Message>> pendingMessagesToPurge, 
ExecutorService workerPool,
       final HelixDataAccessor accessor) {
     workerPool.submit(new Callable<Object>() {
         @Override
         public Object call() {
-          for (Map.Entry<String, List<Message>> entry : 
pendingMessagesToPurge.entrySet()) {
+          for (Map.Entry<String, Map<String, Message>> entry : 
pendingMessagesToPurge.entrySet()) {
             String instanceName = entry.getKey();
-            for (Message msg : entry.getValue()) {
+            for (Message msg : entry.getValue().values()) {
               if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(), 
instanceName))) {
                 logger.info("Deleted message {} from instance {}", 
msg.getMsgId(), instanceName);
               } else {

Reply via email to