This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c1f9af5b4ffb88a4893ac04890690946e97082e4
Author: narendly <[email protected]>
AuthorDate: Tue Feb 26 17:28:38 2019 -0800

    [HELIX-810] HELIX: Fix NPE in InstanceMessagesCache
    
    It was observed that InstanceMessagesCache was throwing an NPE when it 
tries to setRelayTime(). This is likely because some relay messages have target 
instances that are no longer live (thus not in liveInstanceMap). 
InstanceMessagesCache must handle this gracefully by skipping the operation. We 
do not delete these msgs right away because the instance may come back alive. 
Otherwise, after some time has passed, the msg will get expired by the 
Controller and be removed.
        Changelist;
        1. Add a try-catch block
        2. Improve logging
---
 .../helix/common/caches/InstanceMessagesCache.java | 133 +++++++++++----------
 1 file changed, 72 insertions(+), 61 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index b57ffb9..21c35af 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -326,84 +326,95 @@ public class InstanceMessagesCache {
 
   private void setRelayTime(Message relayMessage, Map<String, LiveInstance> 
liveInstanceMap,
       Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
-
     // relay time already set, avoid to reset it to a later time.
     if (relayMessage.getRelayTime() > relayMessage.getCreateTimeStamp()) {
       return;
     }
 
-    Message hostedMessage = 
_relayHostMessageCache.get(relayMessage.getMsgId());
-    String sessionId = hostedMessage.getTgtSessionId();
-    String instance = hostedMessage.getTgtName();
-    String resourceName = hostedMessage.getResourceName();
-    String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
-
-    long currentTime = System.currentTimeMillis();
-    long expiredTime = currentTime + relayMessage.getExpiryPeriod();
-
-    if (!instanceSessionId.equals(sessionId)) {
-      LOG.debug(
-          "Hosted Instance SessionId {} does not match sessionId {} in hosted 
message , set relay message {} to be expired at {}, hosted message ",
-          instanceSessionId, sessionId, relayMessage.getId(), expiredTime,
-          hostedMessage.getMsgId());
-      relayMessage.setRelayTime(currentTime);
-      return;
-    }
-
-    Map<String, Map<String, CurrentState>> instanceCurrentStateMap = 
currentStateMap.get(instance);
-    if (instanceCurrentStateMap == null) {
-      LOG.debug(
-          "No instanceCurrentStateMap found for {} on {}, set relay messages 
{} to be expired at {}"
-              + resourceName, instance, relayMessage.getId(), expiredTime);
-      relayMessage.setRelayTime(currentTime);
-      return;
-    }
+    // Relay time has not been set. Proceed to set the relay time
+    try {
+      long currentTime = System.currentTimeMillis();
+      long expiredTime = currentTime + relayMessage.getExpiryPeriod();
+
+      Message hostedMessage = 
_relayHostMessageCache.get(relayMessage.getMsgId());
+      String sessionId = hostedMessage.getTgtSessionId();
+      String instance = hostedMessage.getTgtName();
+      String resourceName = hostedMessage.getResourceName();
+      if (!liveInstanceMap.containsKey(instance)) {
+        // It's possible that hostedMsg's target is no longer live. In this 
case, we just set the
+        // relay time and return so that the message could be cleaned up after 
a short delay
+        relayMessage.setRelayTime(currentTime);
+        return;
+      }
+      String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
+
+      if (!instanceSessionId.equals(sessionId)) {
+        LOG.debug(
+            "Hosted Instance SessionId {} does not match sessionId {} in 
hosted message , set relay message {} to be expired at {}, hosted message ",
+            instanceSessionId, sessionId, relayMessage.getId(), expiredTime,
+            hostedMessage.getMsgId());
+        relayMessage.setRelayTime(currentTime);
+        return;
+      }
 
-    Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
-    if (sessionCurrentStateMap == null) {
-      LOG.debug("No sessionCurrentStateMap found, set relay messages {} to be 
expired at {}. ",
-          relayMessage.getId(), expiredTime);
-      relayMessage.setRelayTime(currentTime);
-      return;
-    }
+      Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+          currentStateMap.get(instance);
+      if (instanceCurrentStateMap == null) {
+        LOG.debug(
+            "No instanceCurrentStateMap found for {} on {}, set relay messages 
{} to be expired at {}"
+                + resourceName,
+            instance, relayMessage.getId(), expiredTime);
+        relayMessage.setRelayTime(currentTime);
+        return;
+      }
 
-    String partitionName = hostedMessage.getPartitionName();
-    String targetState = hostedMessage.getToState();
-    String fromState = hostedMessage.getFromState();
+      Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
+      if (sessionCurrentStateMap == null) {
+        LOG.debug("No sessionCurrentStateMap found, set relay messages {} to 
be expired at {}. ",
+            relayMessage.getId(), expiredTime);
+        relayMessage.setRelayTime(currentTime);
+        return;
+      }
 
-    CurrentState currentState = sessionCurrentStateMap.get(resourceName);
-    if (currentState == null) {
-      LOG.debug("No currentState found for {} on {}, set relay message {} to 
be expired at {} ",
-          resourceName, instance, relayMessage.getId(),
-          (currentTime + relayMessage.getExpiryPeriod()));
-      relayMessage.setRelayTime(currentTime);
-      return;
-    }
+      String partitionName = hostedMessage.getPartitionName();
+      String targetState = hostedMessage.getToState();
+      String fromState = hostedMessage.getFromState();
+
+      CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+      if (currentState == null) {
+        LOG.debug("No currentState found for {} on {}, set relay message {} to 
be expired at {} ",
+            resourceName, instance, relayMessage.getId(),
+            (currentTime + relayMessage.getExpiryPeriod()));
+        relayMessage.setRelayTime(currentTime);
+        return;
+      }
 
-    if (targetState.equals(currentState.getState(partitionName))) {
-      long completeTime = currentState.getEndTime(partitionName);
-      if (completeTime < relayMessage.getCreateTimeStamp()) {
-        completeTime = currentTime;
+      if (targetState.equals(currentState.getState(partitionName))) {
+        long completeTime = currentState.getEndTime(partitionName);
+        if (completeTime < relayMessage.getCreateTimeStamp()) {
+          completeTime = currentTime;
+        }
+        relayMessage.setRelayTime(completeTime);
+        LOG.debug(
+            "Target state match the hosted message's target state, set relay 
message {} relay time at {}.",
+            relayMessage.getId(), completeTime);
       }
-      relayMessage.setRelayTime(completeTime);
-      LOG.debug(
-          "Target state match the hosted message's target state, set relay 
message {} relay time at {}.",
-          relayMessage.getId(), completeTime);
-    }
 
-    if (!fromState.equals(currentState.getState(partitionName))) {
-      LOG.debug(
-          "Current state does not match hosted message's from state, set relay 
message {} relay time at {}.",
-          relayMessage.getId(), currentTime);
-      relayMessage.setRelayTime(currentTime);
+      if (!fromState.equals(currentState.getState(partitionName))) {
+        LOG.debug(
+            "Current state does not match hosted message's from state, set 
relay message {} relay time at {}.",
+            relayMessage.getId(), currentTime);
+        relayMessage.setRelayTime(currentTime);
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to set the relay time. RelayMsgId: {} Exception: {}", 
relayMessage.getId(),
+          e);
     }
   }
 
   /**
    * Provides a list of current outstanding pending state transition messages 
on a given instance.
-   *
    * @param instanceName
-   *
    * @return
    */
   public Map<String, Message> getMessages(String instanceName) {

Reply via email to