This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 1927792 Fixes #1802 - messages intended for instances that are no
longer in the cluster (#1951)
1927792 is described below
commit 192779276eda1a3336525a07ae823003172fcc9e
Author: Komal Desai <[email protected]>
AuthorDate: Mon Feb 7 15:06:57 2022 -0800
Fixes #1802 - messages intended for instances that are no longer in the
cluster (#1951)
In MessageGenerationPhase.java, - process() method populates the list of
live instances from cache.
But while generateMessage() method has the sessionIdMap information, it
still goes through partition/resource/instance map without checking if instance
is still part of the cluster or not.
It is possible that cache has stale entry but that logic needs to be worked
separately. But while generating message, we should check if the instance is
still there.
So this is a simple change. We need to still look further if cache is
getting invalidated properly.
To make sure that the cache properly is handled/refreshed under instance
being replaced or deletion - have filled another bug: #1956
---
.../helix/controller/stages/MessageGenerationPhase.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 98313b9..7981302 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -193,7 +193,7 @@ public class MessageGenerationPhase extends
AbstractBaseStage {
}
}
- if (pendingMessage != null &&
shouldCleanUpPendingMessage(pendingMessage, currentState,
+ if (shouldCleanUpPendingMessage(pendingMessage, sessionIdMap,
instanceName, currentState,
currentStateOutput.getEndTime(resourceName, partition,
instanceName))) {
logAndAddToCleanUp(messagesToCleanUp, pendingMessage, instanceName,
resourceName,
partition, currentState, PENDING_MESSAGE);
@@ -203,7 +203,8 @@ public class MessageGenerationPhase extends
AbstractBaseStage {
// staleMessage can be simple or batch mode
if ((System.currentTimeMillis() - currentStateOutput
.getEndTime(resourceName, partition, instanceName) >
DEFAULT_OBSELETE_MSG_PURGE_DELAY)
- && staleMessage.getResourceName().equals(resourceName) && (
+ && staleMessage.getResourceName().equals(resourceName) &&
sessionIdMap
+ .containsKey(instanceName) && (
staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
staleMessage.getBatchMessageMode() &&
staleMessage.getPartitionNames()
.contains(partition.getPartitionName())))) {
@@ -404,9 +405,9 @@ public class MessageGenerationPhase extends
AbstractBaseStage {
});
}
- private boolean shouldCleanUpPendingMessage(Message pendingMsg, String
currentState,
- Long currentStateTransitionEndTime) {
- if (pendingMsg == null) {
+ private boolean shouldCleanUpPendingMessage(Message pendingMsg, Map<String,
String> sessionIdMap,
+ String instanceName, String currentState, Long
currentStateTransitionEndTime) {
+ if (pendingMsg == null || !sessionIdMap.containsKey(instanceName)) {
return false;
}
if (currentState.equalsIgnoreCase(pendingMsg.getToState())) {