This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1927792  Fixes #1802 - messages intended for instances that are no 
longer in the cluster (#1951)
1927792 is described below

commit 192779276eda1a3336525a07ae823003172fcc9e
Author: Komal Desai <[email protected]>
AuthorDate: Mon Feb 7 15:06:57 2022 -0800

    Fixes #1802 - messages intended for instances that are no longer in the 
cluster (#1951)
    
    In MessageGenerationPhase.java, - process() method populates the list of 
live instances from cache.
    
    But while generateMessage() method has the sessionIdMap information, it 
still goes through partition/resource/instance map without checking if instance 
is still part of the cluster or not.
    
    It is possible that cache has stale entry but that logic needs to be worked 
separately. But while generating message, we should check if the instance is 
still there.
    
    So this is a simple change. We need to still look further if cache is 
getting invalidated properly.
    
    To make sure that the cache properly is handled/refreshed under instance 
being replaced or deletion - have filled another bug: #1956
---
 .../helix/controller/stages/MessageGenerationPhase.java       | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 98313b9..7981302 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -193,7 +193,7 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
           }
         }
 
-        if (pendingMessage != null && 
shouldCleanUpPendingMessage(pendingMessage, currentState,
+        if (shouldCleanUpPendingMessage(pendingMessage, sessionIdMap, 
instanceName, currentState,
             currentStateOutput.getEndTime(resourceName, partition, 
instanceName))) {
           logAndAddToCleanUp(messagesToCleanUp, pendingMessage, instanceName, 
resourceName,
               partition, currentState, PENDING_MESSAGE);
@@ -203,7 +203,8 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
           // staleMessage can be simple or batch mode
           if ((System.currentTimeMillis() - currentStateOutput
               .getEndTime(resourceName, partition, instanceName) > 
DEFAULT_OBSELETE_MSG_PURGE_DELAY)
-              && staleMessage.getResourceName().equals(resourceName) && (
+              && staleMessage.getResourceName().equals(resourceName) && 
sessionIdMap
+              .containsKey(instanceName) && (
               
staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
                   staleMessage.getBatchMessageMode() && 
staleMessage.getPartitionNames()
                       .contains(partition.getPartitionName())))) {
@@ -404,9 +405,9 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
     });
   }
 
-  private boolean shouldCleanUpPendingMessage(Message pendingMsg, String 
currentState,
-      Long currentStateTransitionEndTime) {
-    if (pendingMsg == null) {
+  private boolean shouldCleanUpPendingMessage(Message pendingMsg, Map<String, 
String> sessionIdMap,
+      String instanceName, String currentState, Long 
currentStateTransitionEndTime) {
+    if (pendingMsg == null || !sessionIdMap.containsKey(instanceName)) {
       return false;
     }
     if (currentState.equalsIgnoreCase(pendingMsg.getToState())) {

Reply via email to