This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 7da0ddbe6 Follow up change: NPE in IntermediateStateCalc (#2673)
7da0ddbe6 is described below
commit 7da0ddbe6148dd1d518c601b529c8c11fef41665
Author: Komal Desai <[email protected]>
AuthorDate: Fri Oct 20 09:43:47 2023 -0700
Follow up change: NPE in IntermediateStateCalc (#2673)
Follow up change to NPE in intermediate stage, we should not skip message
throttling in case of missing partition's preference list.
---
.../controller/stages/IntermediateStateCalcStage.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index ec17b620c..477e4f99b 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -367,11 +367,11 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
currentStateOutput.getCurrentStateMap(resourceName,
partition).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
- if (preferenceList == null || preferenceList.size() == 0) {
- continue;
- }
Map<String, Integer> requiredState = getRequiredStates(resourceName,
cache, preferenceList);
- messagesToThrottle.sort(new MessagePriorityComparator(preferenceList,
stateModelDef.getStatePriorityMap()));
+ if (preferenceList != null && !preferenceList.isEmpty()) {
+ // Sort messages based on the priority (priority is defined in the
state model definition
+ messagesToThrottle.sort(new MessagePriorityComparator(preferenceList,
stateModelDef.getStatePriorityMap()));
+ }
for (Message message : messagesToThrottle) {
RebalanceType rebalanceType =
getRebalanceTypePerMessage(requiredState, message,
derivedCurrentStateMap);
@@ -470,10 +470,6 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// To clarify that custom mode does not apply recovery/load rebalance
since user can define different number of
// replicas for different partitions. Actually, the custom will stopped
from resource level checks if this resource
// is not FULL_AUTO, we will return best possible state and do nothing.
- List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
- if (preferenceList == null) {
- continue;
- }
Map<String, Integer> requiredStates =
getRequiredStates(resourceName, cache,
preferenceLists.get(partition.getPartitionName()));
// Maps instance to its current state
@@ -482,8 +478,11 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// Maps instance to its pending (next) state
List<Message> pendingMessages = new ArrayList<>(
currentStateOutput.getPendingMessageMap(resourceName,
partition).values());
- pendingMessages.sort(new
MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
- stateModelDefinition.getStatePriorityMap()));
+ List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
+ if (preferenceList != null && !preferenceList.isEmpty()) {
+ pendingMessages.sort(new MessagePriorityComparator(preferenceList,
+ stateModelDefinition.getStatePriorityMap()));
+ }
for (Message message : pendingMessages) {
StateTransitionThrottleConfig.RebalanceType rebalanceType =