This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 4f19dad4f Fix NPE in intermediate state calculation stage (#2668)
4f19dad4f is described below
commit 4f19dad4f5e2a016f8e3355d240605644e745259
Author: Komal Desai <[email protected]>
AuthorDate: Wed Oct 18 11:21:43 2023 -0700
Fix NPE in intermediate state calculation stage (#2668)
Fix the NPE in intermediate state calcuation stage when a partition is
deleted through update in NUM_PARTITION field
---
.../stages/IntermediateStateCalcStage.java | 20 ++-
.../stages/TestIntermediateStateCalcStage.java | 162 +++++++++++++++++++++
2 files changed, 177 insertions(+), 5 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 00c49ad91..ec17b620c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -367,6 +367,9 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
currentStateOutput.getCurrentStateMap(resourceName,
partition).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
+ if (preferenceList == null || preferenceList.size() == 0) {
+ continue;
+ }
Map<String, Integer> requiredState = getRequiredStates(resourceName,
cache, preferenceList);
messagesToThrottle.sort(new MessagePriorityComparator(preferenceList,
stateModelDef.getStatePriorityMap()));
for (Message message : messagesToThrottle) {
@@ -467,6 +470,10 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// To clarify that custom mode does not apply recovery/load rebalance
since user can define different number of
// replicas for different partitions. Actually, the custom will stopped
from resource level checks if this resource
// is not FULL_AUTO, we will return best possible state and do nothing.
+ List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
+ if (preferenceList == null) {
+ continue;
+ }
Map<String, Integer> requiredStates =
getRequiredStates(resourceName, cache,
preferenceLists.get(partition.getPartitionName()));
// Maps instance to its current state
@@ -643,15 +650,18 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
StateModelDefinition stateModelDefinition =
resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
int requiredNumReplica =
- idealState.getMinActiveReplicas() == -1 ?
idealState.getReplicaCount(preferenceList.size())
+ idealState.getMinActiveReplicas() == -1 ?
+ idealState.getReplicaCount(preferenceList == null ? 0 :
preferenceList.size())
: idealState.getMinActiveReplicas();
// Generate a state mapping, state -> required numbers based on the live
and enabled instances for this partition
// preference list
- return stateModelDefinition.getStateCountMap(
- (int) preferenceList.stream()
- .filter(i ->
resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
- .count(), requiredNumReplica); // StateModelDefinition's counts
+ if (preferenceList != null) {
+ return stateModelDefinition.getStateCountMap((int)
preferenceList.stream().filter(i ->
resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+ .count(), requiredNumReplica); // StateModelDefinition's counts
+ }
+ return
stateModelDefinition.getStateCountMap(resourceControllerDataProvider.getEnabledLiveInstances().size(),
+ requiredNumReplica); // StateModelDefinition's counts
}
/**
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index b651e0d28..60281e65b 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -406,6 +406,168 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
}
}
+ @Test
+ public void testPartitionMissing() {
+ String resourcePrefix = "resource";
+ int nResource = 4;
+ int nPartition = 2;
+ int nReplica = 3;
+
+ String[] resources = new String[nResource];
+ for (int i = 0; i < nResource; i++) {
+ resources[i] = resourcePrefix + "_" + i;
+ }
+
+ preSetup(resources, nReplica, nReplica);
+ event.addAttribute(AttributeName.RESOURCES.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ getResourceMap(resources, nPartition, "OnlineOffline"));
+
+ // Initialize bestpossible state and current state
+ BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ MessageOutput messageSelectOutput = new MessageOutput();
+ IntermediateStateOutput expectedResult = new IntermediateStateOutput();
+
+ _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
+ setClusterConfig(_clusterConfig);
+
+ for (String resource : resources) {
+ IdealState is =
accessor.getProperty(accessor.keyBuilder().idealStates(resource));
+ setSingleIdealState(is);
+
+ Map<String, List<String>> partitionMap = new HashMap<>();
+ for (int p = 0; p < nPartition; p++) {
+ Partition partition = new Partition(resource + "_" + p);
+ for (int r = 0; r < nReplica; r++) {
+ String instanceName = HOSTNAME_PREFIX + r;
+
+ // PartitionMap is used as a preferenceList.
+ // For the last partition, let us add null as preferenceList.
+ if (p != nPartition - 1) {
+ partitionMap.put(partition.getPartitionName(),
Collections.singletonList(instanceName));
+ } else {
+ partitionMap.put(partition.getPartitionName(), null);
+ }
+
+ // TODO: The following code is same for testNoStateMissing
+ if (resource.endsWith("0")) {
+ // Regular recovery balance
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
+ // add blocked state transition messages
+ Message pendingMessage = generateMessage("OFFLINE", "ONLINE",
instanceName);
+ currentStateOutput.setPendingMessage(resource, partition,
instanceName, pendingMessage);
+
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+
+ // should be recovered:
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ } else if (resource.endsWith("1")) {
+ // Regular load balance
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "ONLINE");
+ currentStateOutput.setCurrentState(resource, partition,
instanceName + "-1", "OFFLINE");
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+ // should be recovered:
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ } else if (resource.endsWith("2")) {
+ // Recovery balance with transient states, should keep the current
states in the output.
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "OFFLINE");
+ // should be kept unchanged:
+ expectedResult.setState(resource, partition, instanceName,
"OFFLINE");
+ } else if (resource.endsWith("3")) {
+ // One unresolved error should not prevent recovery balance
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ if (p == 0) {
+ if (r == 0) {
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "ERROR");
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ERROR");
+ // This partition is still ERROR
+ expectedResult.setState(resource, partition, instanceName,
"ERROR");
+ } else {
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName));
+ // Recovery balance
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ }
+ } else {
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "ONLINE");
+ currentStateOutput.setCurrentState(resource, partition,
instanceName + "-1", "OFFLINE");
+ // load balance is throttled, so keep all current states
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ // The following must be removed because now downward state
transitions are allowed
+ // expectedResult.setState(resource, partition, instanceName +
"-1", "OFFLINE");
+ }
+ } else if (resource.endsWith("4")) {
+ // Test that partitions with replicas to drop are dropping them
when recovery is
+ // happening for other partitions
+ if (p == 0) {
+ // This partition requires recovery
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName));
+ // After recovery, it should be back ONLINE
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ } else {
+ // Other partitions require dropping of replicas
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "ONLINE");
+ currentStateOutput.setCurrentState(resource, partition,
instanceName + "-1", "OFFLINE");
+ // BestPossibleState dictates that we only need one ONLINE
replica
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ bestPossibleStateOutput.setState(resource, partition,
instanceName + "-1", "DROPPED");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+ // So instanceName-1 will NOT be expected to show up in
expectedResult
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ expectedResult.setState(resource, partition, instanceName +
"-1", "DROPPED");
+ }
+ } else if (resource.endsWith("5")) {
+ // Test that load balance bringing up a new replica does NOT
happen with a recovery
+ // partition
+ if (p == 0) {
+ // Set up a partition requiring recovery
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName));
+ // After recovery, it should be back ONLINE
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ } else {
+ currentStateOutput.setCurrentState(resource, partition,
instanceName, "ONLINE");
+ bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ // Check that load balance (bringing up a new node) did not take
place
+ bestPossibleStateOutput.setState(resource, partition,
instanceName + "-1", "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
+ expectedResult.setState(resource, partition, instanceName,
"ONLINE");
+ }
+ }
+ }
+ }
+ bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
+ }
+
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
messageSelectOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), new
ResourceControllerDataProvider());
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, new IntermediateStateCalcStage());
+
+ IntermediateStateOutput output =
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+ for (String resource : resources) {
+ // Note Assert.assertEquals won't work. If "actual" is an empty map, it
won't compare
+ // anything.
+ Assert.assertTrue(output.getPartitionStateMap(resource)
+ .getStateMap()
+
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
+ }
+ }
+
private void preSetup(String[] resources, int numOfLiveInstances, int
numOfReplicas) {
setupIdealState(numOfLiveInstances, resources, numOfLiveInstances,
numOfReplicas,
IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");