This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new e0c551d1a Fix waged instance capacity npe on new resource (#2969)
e0c551d1a is described below

commit e0c551d1ab811654f0474643c12ea78c9e90f7f4
Author: Grant Paláu Spencer <gspen...@linkedin.com>
AuthorDate: Wed Jan 29 11:15:14 2025 -0800

    Fix waged instance capacity npe on new resource (#2969)
    
    Fix waged instance capacity npe on new resource by clearing the WAGED 
capacity map whenever there are no WAGED resources in the cluster. This will 
prevent a stale map from being used once a new resource is added.
---
 .../ResourceControllerDataProvider.java            |   8 ++
 .../stages/CurrentStateComputationStage.java       |   7 ++
 .../org/apache/helix/integration/TestWagedNPE.java | 111 +++++++++++++++++++++
 3 files changed, 126 insertions(+)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 021aab6a8..b006a4f1e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -515,6 +515,14 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
     _wagedPartitionWeightProvider = resourceWeightProvider;
   }
 
+  /**
+   * Clears the WAGED algorithm specific instance capacity provider and 
resource weight provider.
+   */
+  public void clearWagedCapacityProviders() {
+    _wagedInstanceCapacity = null;
+    _wagedPartitionWeightProvider = null;
+  }
+
   /**
    * Check and reduce the capacity of an instance for a resource partition
    * @param instance - the instance to check
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index da972d682..2814e4062 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -356,6 +356,11 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
       CurrentStateOutput currentStateOutput) {
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
     if (skipCapacityCalculation(cache, resourceMap, event)) {
+      // Ensure instance capacity is null if there are no resources. This 
prevents using a stale map when all resources
+      // are removed and then a new resource is added.
+      if (resourceMap == null || resourceMap.isEmpty()) {
+        cache.clearWagedCapacityProviders();
+      }
       return;
     }
 
@@ -364,7 +369,9 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
         .filter(entry -> 
WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey())))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
+    // Ensure instance capacity is null if there are no WAGED enabled instances
     if (wagedEnabledResourceMap.isEmpty()) {
+      cache.clearWagedCapacityProviders();
       return;
     }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java
new file mode 100644
index 000000000..57930fa71
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java
@@ -0,0 +1,111 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestWagedNPE extends ZkTestBase  {
+
+  public static String CLUSTER_NAME = TestHelper.getTestClassName() + 
"_cluster";
+  public static int PARTICIPANT_COUNT = 3;
+  public static List<MockParticipantManager> _participants = new ArrayList<>();
+  public static ClusterControllerManager _controller;
+  public static ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println("Start test " + TestHelper.getTestClassName());
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < PARTICIPANT_COUNT; i++) {
+      addParticipant("localhost_" + i);
+    }
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig =  
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    String testCapacityKey = "TestCapacityKey";
+    
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+    
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
 100));
+    
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
 1));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+
+  // This test was constructed to capture the bug described in issue 2891
+  // https://github.com/apache/helix/issues/2891
+  @Test
+  public void testNPE() throws Exception {
+    int numPartition = 3;
+    BestPossibleExternalViewVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+
+    // Create 1 WAGED Resource
+    String firstDB = "firstDB";
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, 
"LeaderStandby",
+        IdealState.RebalanceMode.FULL_AUTO.name(), null);
+    IdealState idealStateOne =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
firstDB);
+    idealStateOne.setMinActiveReplicas(2);
+    idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
firstDB, idealStateOne);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
+
+    // Wait for cluster to converge
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // Drop resource
+    _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, firstDB);
+
+    // Wait for cluster to converge
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // add instance
+    addParticipant("instance_to_add");
+
+    // Wait for cluster to converge
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // Add a new resource
+    String secondDb = "secondDB";
+    _configAccessor.setResourceConfig(CLUSTER_NAME, secondDb, new 
ResourceConfig(secondDb));
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, secondDb, numPartition, 
"LeaderStandby",
+        IdealState.RebalanceMode.FULL_AUTO.name(), null);
+    IdealState idealStateTwo =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
secondDb);
+    idealStateTwo.setMinActiveReplicas(2);
+    idealStateTwo.setRebalancerClassName(WagedRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
secondDb, idealStateTwo);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, secondDb, 3);
+
+    // Confirm cluster can converge. Cluster will not converge if NPE occurs 
during pipeline run
+    Assert.assertTrue(verifier.verifyByPolling());
+  }
+
+  public MockParticipantManager addParticipant(String instanceName) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
+    MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, instanceName);
+    participant.syncStart();
+    _participants.add(participant);
+    return participant;
+  }
+}

Reply via email to