This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new a478bbc  Fix for - Stale message redundant logs
a478bbc is described below

commit a478bbcfc8161f3f7ed62b1e1a28cc4b0e68e863
Author: desaikomal <[email protected]>
AuthorDate: Sun Jan 30 18:08:42 2022 -0800

    Fix for - Stale message redundant logs
    
    Avoid printing redundant log messages for unrelated partitions and 
resources.
---
 .../controller/stages/MessageGenerationPhase.java  | 69 ++++++++++++----------
 1 file changed, 38 insertions(+), 31 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 38d9908..98313b9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -77,7 +77,8 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     _eventId = event.getEventId();
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
-    BaseControllerDataProvider cache = 
event.getAttribute(AttributeName.ControllerDataProvider.name());
+    BaseControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
     Map<String, Resource> resourceMap =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
     CurrentStateOutput currentStateOutput = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
@@ -172,15 +173,14 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
           nextState = stateModelDef.getNextStateForTransition(currentState, 
desiredState);
 
           if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
-            LogUtil.logDebug(logger, _eventId,
-                String.format(
-                    "No current state for partition %s in resource %s, skip 
the drop message",
+            LogUtil.logDebug(logger, _eventId, String
+                .format("No current state for partition %s in resource %s, 
skip the drop message",
                     partition.getPartitionName(), resourceName));
 
             message =
-                generateCancellationMessageForPendingMessage(desiredState, 
currentState, nextState, pendingMessage,
-                    manager, resource, partition, sessionIdMap, instanceName, 
stateModelDef,
-                    cancellationMessage, isCancellationEnabled);
+                generateCancellationMessageForPendingMessage(desiredState, 
currentState, nextState,
+                    pendingMessage, manager, resource, partition, 
sessionIdMap, instanceName,
+                    stateModelDef, cancellationMessage, isCancellationEnabled);
             addGeneratedMessageToMap(message, messageMap, eventType, cache, 
desiredState,
                 resourceName, partition, currentState, nextState);
 
@@ -200,9 +200,13 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
         }
 
         for (Message staleMessage : staleMessages) {
-          if (System.currentTimeMillis() - currentStateOutput
-              .getEndTime(resourceName, partition, instanceName)
-              > DEFAULT_OBSELETE_MSG_PURGE_DELAY) {
+          // staleMessage can be simple or batch mode
+          if ((System.currentTimeMillis() - currentStateOutput
+              .getEndTime(resourceName, partition, instanceName) > 
DEFAULT_OBSELETE_MSG_PURGE_DELAY)
+              && staleMessage.getResourceName().equals(resourceName) && (
+              
staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
+                  staleMessage.getBatchMessageMode() && 
staleMessage.getPartitionNames()
+                      .contains(partition.getPartitionName())))) {
             logAndAddToCleanUp(messagesToCleanUp, staleMessage, instanceName, 
resourceName,
                 partition, currentState, STALE_MESSAGE);
           }
@@ -229,9 +233,9 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
 
           if (pendingMessage != null) {
             message =
-                generateCancellationMessageForPendingMessage(desiredState, 
currentState, nextState, pendingMessage,
-                    manager, resource, partition, sessionIdMap, instanceName, 
stateModelDef,
-                    cancellationMessage, isCancellationEnabled);
+                generateCancellationMessageForPendingMessage(desiredState, 
currentState, nextState,
+                    pendingMessage, manager, resource, partition, 
sessionIdMap, instanceName,
+                    stateModelDef, cancellationMessage, isCancellationEnabled);
           } else {
             // Create new state transition message
             message = MessageUtil
@@ -284,9 +288,9 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
     // Cancel the ST except below scenarios:
     // 1. pending message toState is desired state
     // 2. pending message is an ERROR reset: ERROR -> initState (eg. OFFLINE)
-    return !desiredState.equalsIgnoreCase(pendingMessage.getToState())
-        && 
!(HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState())
-        && initialState.equals(pendingMessage.getToState()));
+    return !desiredState.equalsIgnoreCase(pendingMessage.getToState()) && !(
+        HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) 
&& initialState
+            .equals(pendingMessage.getToState()));
   }
 
   private void logAndAddToCleanUp(Map<String, Map<String, Message>> 
messagesToCleanUp,
@@ -303,11 +307,12 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
     messagesToCleanUp.get(instanceName).put(message.getMsgId(), message);
   }
 
-  private Message generateCancellationMessageForPendingMessage(final String 
desiredState, final String currentState,
-      final String nextState, final Message pendingMessage, final HelixManager 
manager,
-      final Resource resource, final Partition partition, final Map<String, 
String> sessionIdMap,
-      final String instanceName, final StateModelDefinition stateModelDef,
-      final Message cancellationMessage, final boolean isCancellationEnabled) {
+  private Message generateCancellationMessageForPendingMessage(final String 
desiredState,
+      final String currentState, final String nextState, final Message 
pendingMessage,
+      final HelixManager manager, final Resource resource, final Partition 
partition,
+      final Map<String, String> sessionIdMap, final String instanceName,
+      final StateModelDefinition stateModelDef, final Message 
cancellationMessage,
+      final boolean isCancellationEnabled) {
 
     Message message = null;
 
@@ -316,19 +321,20 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
       if (nextState.equalsIgnoreCase(pendingState)) {
         LogUtil.logInfo(logger, _eventId,
             "Message already exists for " + instanceName + " to transit " + 
resource
-                .getResourceName() + "." + partition.getPartitionName() + " 
from "
-                + currentState + " to " + nextState + ", isRelay: " + 
pendingMessage.isRelayMessage());
+                .getResourceName() + "." + partition.getPartitionName() + " 
from " + currentState
+                + " to " + nextState + ", isRelay: " + 
pendingMessage.isRelayMessage());
       } else if (currentState.equalsIgnoreCase(pendingState)) {
         LogUtil.logInfo(logger, _eventId,
             "Message hasn't been removed for " + instanceName + " to transit " 
+ resource
-                .getResourceName() + "." + partition.getPartitionName() + " to 
"
-                + pendingState + ", desiredState: " + desiredState + ", 
isRelay: " + pendingMessage.isRelayMessage());
+                .getResourceName() + "." + partition.getPartitionName() + " to 
" + pendingState
+                + ", desiredState: " + desiredState + ", isRelay: " + 
pendingMessage
+                .isRelayMessage());
       } else {
         LogUtil.logInfo(logger, _eventId,
-            "IdealState changed before state transition completes for " + 
resource
-                .getResourceName() + "." + partition.getPartitionName() + " on 
"
-                + instanceName + ", pendingState: " + pendingState + ", 
currentState: "
-                + currentState + ", nextState: " + nextState + ", isRelay: " + 
pendingMessage.isRelayMessage());
+            "IdealState changed before state transition completes for " + 
resource.getResourceName()
+                + "." + partition.getPartitionName() + " on " + instanceName + 
", pendingState: "
+                + pendingState + ", currentState: " + currentState + ", 
nextState: " + nextState
+                + ", isRelay: " + pendingMessage.isRelayMessage());
 
         message = 
MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(),
             manager.getSessionId(), resource, partition.getPartitionName(), 
instanceName,
@@ -353,8 +359,9 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
         }
       }
 
-      int timeout = getTimeOut(cache.getClusterConfig(), 
cache.getResourceConfig(resourceName),
-          currentState, nextState, idealState, partition);
+      int timeout =
+          getTimeOut(cache.getClusterConfig(), 
cache.getResourceConfig(resourceName), currentState,
+              nextState, idealState, partition);
       if (timeout > 0) {
         message.setExecutionTimeout(timeout);
       }

Reply via email to