This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ee47c7  Fix issue when client only sets ANY at cluster level throttle 
config
9ee47c7 is described below

commit 9ee47c7d22ca57c376764590e253cbeffaaa17c8
Author: Yi Wang <i3.wan...@gmail.com>
AuthorDate: Thu Aug 8 17:34:47 2019 -0700

    Fix issue when client only sets ANY at cluster level throttle config
    
    fixes #332
    Added unit test for StateTransitionThrottleController
    Added integration test for verifying case when only cluster level ANY 
throttle set to 1# Please enter the commit message for your changes. Lines 
starting
---
 .../stages/StateTransitionThrottleController.java  |  65 +++----
 .../TestStateTransitionThrottleController.java     | 194 +++++++++++++++++++++
 .../integration/TestPartitionMovementThrottle.java | 100 +++++++++--
 3 files changed, 302 insertions(+), 57 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
index ecfe256..e42354c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -38,15 +38,15 @@ class StateTransitionThrottleController {
   private static Logger logger = 
LoggerFactory.getLogger(StateTransitionThrottleController.class);
 
   // pending allowed transition counts in the cluster level for recovery and 
load balance
-  private Map<StateTransitionThrottleConfig.RebalanceType, Long> 
_pendingTransitionAllowedInCluster;
+  Map<StateTransitionThrottleConfig.RebalanceType, Long> 
_pendingTransitionAllowedInCluster;
 
   // pending allowed transition counts for each instance and resource
-  private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> 
_pendingTransitionAllowedPerInstance;
-  private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> 
_pendingTransitionAllowedPerResource;
+  Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> 
_pendingTransitionAllowedPerInstance;
+  Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> 
_pendingTransitionAllowedPerResource;
 
   private boolean _throttleEnabled = false;
 
-  public StateTransitionThrottleController(Set<String> resources, 
ClusterConfig clusterConfig,
+  StateTransitionThrottleController(Set<String> resources, ClusterConfig 
clusterConfig,
       Set<String> liveInstances) {
     super();
     _pendingTransitionAllowedInCluster = new HashMap<>();
@@ -175,13 +175,7 @@ class StateTransitionThrottleController {
    * @param rebalanceType
    */
   protected void chargeCluster(StateTransitionThrottleConfig.RebalanceType 
rebalanceType) {
-    if (_pendingTransitionAllowedInCluster.containsKey(rebalanceType)) {
-      Long clusterThrottle = 
_pendingTransitionAllowedInCluster.get(rebalanceType);
-      chargeANYType(_pendingTransitionAllowedInCluster);
-      if (clusterThrottle > 0) {
-        _pendingTransitionAllowedInCluster.put(rebalanceType, clusterThrottle 
- 1);
-      }
-    }
+    charge(rebalanceType, _pendingTransitionAllowedInCluster);
   }
 
   /**
@@ -191,14 +185,7 @@ class StateTransitionThrottleController {
    */
   protected void chargeResource(StateTransitionThrottleConfig.RebalanceType 
rebalanceType,
       String resource) {
-    if (_pendingTransitionAllowedPerResource.containsKey(resource)
-        && 
_pendingTransitionAllowedPerResource.get(resource).containsKey(rebalanceType)) {
-      chargeANYType(_pendingTransitionAllowedPerResource.get(resource));
-      Long resourceThrottle = 
_pendingTransitionAllowedPerResource.get(resource).get(rebalanceType);
-      if (resourceThrottle > 0) {
-        _pendingTransitionAllowedPerResource.get(resource).put(rebalanceType, 
resourceThrottle - 1);
-      }
-    }
+    charge(rebalanceType, 
_pendingTransitionAllowedPerResource.getOrDefault(resource, new HashMap<>()));
   }
 
   /**
@@ -208,13 +195,21 @@ class StateTransitionThrottleController {
    */
   protected void chargeInstance(StateTransitionThrottleConfig.RebalanceType 
rebalanceType,
       String instance) {
-    if (_pendingTransitionAllowedPerInstance.containsKey(instance)
-        && 
_pendingTransitionAllowedPerInstance.get(instance).containsKey(rebalanceType)) {
-      chargeANYType(_pendingTransitionAllowedPerInstance.get(instance));
-      Long instanceThrottle = 
_pendingTransitionAllowedPerInstance.get(instance).get(rebalanceType);
-      if (instanceThrottle > 0) {
-        _pendingTransitionAllowedPerInstance.get(instance).put(rebalanceType, 
instanceThrottle - 1);
-      }
+    charge(rebalanceType, 
_pendingTransitionAllowedPerInstance.getOrDefault(instance, new HashMap<>()));
+  }
+
+  private void charge(StateTransitionThrottleConfig.RebalanceType 
rebalanceType,
+      Map<StateTransitionThrottleConfig.RebalanceType, Long> quota) {
+    if 
(StateTransitionThrottleConfig.RebalanceType.NONE.equals(rebalanceType)) {
+      logger.error("Wrong rebalance type NONE as parameter");
+      return;
+    }
+    // if ANY type is present, decrement one else do nothing
+    quota.computeIfPresent(StateTransitionThrottleConfig.RebalanceType.ANY,
+        (type, quotaNumber) -> Math.max(0, quotaNumber - 1));
+    if 
(!rebalanceType.equals(StateTransitionThrottleConfig.RebalanceType.ANY)) {
+      // if type is present, decrement one else do nothing
+      quota.computeIfPresent(rebalanceType, (type, quotaNumber) -> Math.max(0, 
quotaNumber - 1));
     }
   }
 
@@ -236,22 +231,4 @@ class StateTransitionThrottleController {
     }
     return false;
   }
-
-  /**
-   * "Charge" for a pending state regardless of the rebalance type by 
subtracting one pending state
-   * from number of total pending state from number of total pending states 
allowed (set by user
-   * application).
-   * @param pendingTransitionAllowed
-   */
-  private void chargeANYType(
-      Map<StateTransitionThrottleConfig.RebalanceType, Long> 
pendingTransitionAllowed) {
-    if 
(pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY))
 {
-      Long anyTypeThrottle =
-          
pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY);
-      if (anyTypeThrottle > 0) {
-        
pendingTransitionAllowed.put(StateTransitionThrottleConfig.RebalanceType.ANY,
-            anyTypeThrottle - 1);
-      }
-    }
-  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionThrottleController.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionThrottleController.java
new file mode 100644
index 0000000..182c428
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionThrottleController.java
@@ -0,0 +1,194 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static 
org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType.ANY;
+import static 
org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
+import static 
org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.model.ClusterConfig;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class TestStateTransitionThrottleController {
+  private static final String INSTANCE = "instance0";
+  private static final String RESOURCE = "db0";
+  private static final List<StateTransitionThrottleConfig.RebalanceType> 
VALID_REBALANCE_TYPES =
+      ImmutableList.of(LOAD_BALANCE, RECOVERY_BALANCE, ANY);
+
+  @Test(description = "When cluster level ANY throttle config is set")
+  public void testChargeClusterWhenANYClusterLevelThrottleConfig() {
+    int maxNumberOfST = 1;
+    ClusterConfig clusterConfig = new ClusterConfig("config");
+    clusterConfig
+        .setStateTransitionThrottleConfigs(ImmutableList.of(new 
StateTransitionThrottleConfig(ANY,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
maxNumberOfST)));
+
+    StateTransitionThrottleControllerAccessor controller =
+        new StateTransitionThrottleControllerAccessor(RESOURCE, INSTANCE, 
clusterConfig);
+    Assert.assertTrue(controller.isThrottleEnabled());
+
+    for (StateTransitionThrottleConfig.RebalanceType rebalanceType : 
VALID_REBALANCE_TYPES) {
+      controller.chargeCluster(rebalanceType);
+      for (StateTransitionThrottleConfig.RebalanceType type : 
VALID_REBALANCE_TYPES) {
+        Assert.assertTrue(controller.shouldThrottleForCluster(type));
+        Assert.assertTrue(controller.shouldThrottleForInstance(type, 
INSTANCE));
+        Assert.assertTrue(controller.shouldThrottleForInstance(type, 
RESOURCE));
+      }
+      // reset controller
+      controller = new StateTransitionThrottleControllerAccessor(RESOURCE, 
INSTANCE, clusterConfig);
+    }
+  }
+
+  @Test(description = "When cluster throttle is config of 
LOAD_BALANCE/RECOVERY_BALANCE, no ANY type")
+  public void testChargeCluster_OnlySetClusterSpecificType() {
+    int maxNumberOfST = 1;
+    ClusterConfig clusterConfig = new ClusterConfig("config");
+    clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of(
+        new StateTransitionThrottleConfig(RECOVERY_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
maxNumberOfST),
+        new StateTransitionThrottleConfig(LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
maxNumberOfST)));
+
+    StateTransitionThrottleControllerAccessor controller =
+        new StateTransitionThrottleControllerAccessor(RESOURCE, INSTANCE, 
clusterConfig);
+
+    Assert.assertTrue(controller.isThrottleEnabled());
+
+    controller.chargeCluster(ANY);
+    Assert.assertEquals(controller.getClusterLevelQuota(RECOVERY_BALANCE), 1);
+    Assert.assertEquals(controller.getClusterLevelQuota(LOAD_BALANCE), 1);
+    Assert.assertEquals(controller.getClusterLevelQuota(ANY), 0);
+
+    VALID_REBALANCE_TYPES.forEach(controller::chargeCluster);
+    for (StateTransitionThrottleConfig.RebalanceType rebalanceType : 
ImmutableList.of(LOAD_BALANCE,
+        RECOVERY_BALANCE)) {
+      Assert.assertTrue(controller.shouldThrottleForCluster(rebalanceType));
+      Assert.assertTrue(controller.shouldThrottleForInstance(rebalanceType, 
INSTANCE));
+      Assert.assertTrue(controller.shouldThrottleForResource(rebalanceType, 
RESOURCE));
+    }
+  }
+
+  @DataProvider
+  public static Object[][] mixedConfigurations() {
+    // TODO: add more mixed configuration setting when refactoring the 
controller logic
+    return new Object[][] {
+        {
+            10, 9, 8, 7, 6, 5, 4, 3, 2
+        }
+    };
+  }
+
+  @Test(dataProvider = "mixedConfigurations")
+  public void testChargeClusterWithMixedThrottleConfig(int 
anyClusterLevelQuota,
+      int loadClusterLevelQuota, int recoveryClusterLevelQuota, int 
anyInstanceLevelQuota,
+      int loadInstanceLevelQuota, int recoveryInstanceLevelQuota, int 
anyResourceLevelQuota,
+      int loadResourceLevelQuota, int recoveryResourceLevelQuota) {
+    List<StateTransitionThrottleConfig> stateTransitionThrottleConfigs = 
Arrays.asList(
+        new StateTransitionThrottleConfig(ANY, 
StateTransitionThrottleConfig.ThrottleScope.CLUSTER,
+            anyClusterLevelQuota),
+        new StateTransitionThrottleConfig(RECOVERY_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
recoveryClusterLevelQuota),
+        new StateTransitionThrottleConfig(LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
loadClusterLevelQuota),
+        new StateTransitionThrottleConfig(ANY, 
StateTransitionThrottleConfig.ThrottleScope.INSTANCE,
+            anyInstanceLevelQuota),
+        new StateTransitionThrottleConfig(RECOVERY_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 
recoveryInstanceLevelQuota),
+        new StateTransitionThrottleConfig(LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 
loadInstanceLevelQuota),
+        new StateTransitionThrottleConfig(ANY, 
StateTransitionThrottleConfig.ThrottleScope.RESOURCE,
+            anyResourceLevelQuota),
+        new StateTransitionThrottleConfig(RECOVERY_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 
recoveryResourceLevelQuota),
+        new StateTransitionThrottleConfig(LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 
loadResourceLevelQuota));
+    ClusterConfig clusterConfig = new ClusterConfig("config");
+    
clusterConfig.setStateTransitionThrottleConfigs(stateTransitionThrottleConfigs);
+
+    StateTransitionThrottleControllerAccessor controller =
+        new StateTransitionThrottleControllerAccessor(RESOURCE, INSTANCE, 
clusterConfig);
+
+    Assert.assertTrue(controller.isThrottleEnabled());
+
+    // verify behavior after charging cluster
+    controller.chargeCluster(ANY);
+    Assert.assertEquals(controller.getClusterLevelQuota(ANY), 
anyClusterLevelQuota - 1);
+    controller.chargeCluster(RECOVERY_BALANCE);
+    Assert.assertEquals(controller.getClusterLevelQuota(RECOVERY_BALANCE),
+        recoveryClusterLevelQuota - 1);
+    controller.chargeCluster(LOAD_BALANCE);
+    Assert.assertEquals(controller.getClusterLevelQuota(LOAD_BALANCE), 
loadClusterLevelQuota - 1);
+
+    // verify behavior after charging instance
+    controller.chargeInstance(ANY, INSTANCE);
+    Assert.assertEquals(controller.getInstanceLevelQuota(ANY, INSTANCE), 
anyInstanceLevelQuota - 1);
+    controller.chargeInstance(RECOVERY_BALANCE, INSTANCE);
+    Assert.assertEquals(controller.getInstanceLevelQuota(RECOVERY_BALANCE, 
INSTANCE),
+        recoveryInstanceLevelQuota - 1);
+    controller.chargeInstance(LOAD_BALANCE, INSTANCE);
+    Assert.assertEquals(controller.getInstanceLevelQuota(LOAD_BALANCE, 
INSTANCE),
+        loadInstanceLevelQuota - 1);
+
+    // verify behavior after charging resource
+    controller.chargeResource(ANY, RESOURCE);
+    Assert.assertEquals(controller.getResourceLevelQuota(ANY, RESOURCE), 
anyResourceLevelQuota - 1);
+    controller.chargeResource(RECOVERY_BALANCE, RESOURCE);
+    Assert.assertEquals(controller.getResourceLevelQuota(RECOVERY_BALANCE, 
RESOURCE),
+        recoveryResourceLevelQuota - 1);
+    controller.chargeResource(LOAD_BALANCE, RESOURCE);
+    Assert.assertEquals(controller.getResourceLevelQuota(LOAD_BALANCE, 
RESOURCE),
+        loadResourceLevelQuota - 1);
+  }
+
+  // The inner class just to fetch the protected fields of {@link 
StateTransitionThrottleController}
+  private static class StateTransitionThrottleControllerAccessor
+      extends StateTransitionThrottleController {
+    StateTransitionThrottleControllerAccessor(String resource, String 
liveInstance,
+        ClusterConfig clusterConfig) {
+      super(ImmutableSet.of(resource), clusterConfig, 
ImmutableSet.of(liveInstance));
+    }
+
+    long getClusterLevelQuota(StateTransitionThrottleConfig.RebalanceType 
rebalanceType) {
+      return _pendingTransitionAllowedInCluster.getOrDefault(rebalanceType, 
0L);
+    }
+
+    long getResourceLevelQuota(StateTransitionThrottleConfig.RebalanceType 
rebalanceType,
+        String resource) {
+      return _pendingTransitionAllowedPerResource.getOrDefault(resource, 
Collections.emptyMap())
+          .getOrDefault(rebalanceType, 0L);
+    }
+
+    long getInstanceLevelQuota(StateTransitionThrottleConfig.RebalanceType 
rebalanceType,
+        String instance) {
+      return _pendingTransitionAllowedPerInstance.getOrDefault(instance, 
Collections.emptyMap())
+          .getOrDefault(rebalanceType, 0L);
+    }
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
index 5d1c8ea..36a1ee3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
@@ -29,25 +29,35 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Message;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ClusterLiveNodesVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
+
   private ConfigAccessor _configAccessor;
   private Set<String> _dbs = new HashSet<>();
 
@@ -149,7 +159,8 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
       // there are recovery or error partitions present, maxPendingTransition 
below is adjusted from
       // 2 to 5 because BOTH recovery balance and load balance could happen in 
the same pipeline
       // iteration
-      validateThrottle(DelayedTransition.getResourcePatitionTransitionTimes(), 
db, 5);
+      Assert.assertTrue(getMaxParallelTransitionCount(
+          DelayedTransition.getResourcePatitionTransitionTimes(), db) <= 5);
     }
   }
 
@@ -179,8 +190,9 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
     Thread.sleep(2000);
 
     for (int i = 0; i < NODE_NR; i++) {
-      validateThrottle(DelayedTransition.getInstancePatitionTransitionTimes(),
-          _participants[i].getInstanceName(), 2);
+      Assert.assertTrue(
+          
getMaxParallelTransitionCount(DelayedTransition.getInstancePatitionTransitionTimes(),
+              _participants[i].getInstanceName()) <= 2);
     }
   }
 
@@ -211,8 +223,60 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
     Thread.sleep(2000L);
 
     for (int i = 0; i < NODE_NR; i++) {
-      validateThrottle(DelayedTransition.getInstancePatitionTransitionTimes(),
-          _participants[i].getInstanceName(), 1);
+      Assert.assertTrue(
+          
getMaxParallelTransitionCount(DelayedTransition.getInstancePatitionTransitionTimes(),
+              _participants[i].getInstanceName()) <= 1);
+    }
+  }
+
+  @Test
+  public void testThrottleOnlyClusterLevelAnyType() {
+    // start some participants
+    for (int i = 0; i < NODE_NR - 3; i++) {
+      _participants[i].syncStart();
+    }
+    // Add resource: TestDB_ANY of 20 partitions
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB + "_OnlyANY",
+        20, STATE_MODEL, RebalanceMode.FULL_AUTO.name());
+    // Act the rebalance process
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB + "_OnlyANY",
+        _replica);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // overwrite the cluster level throttle configuration
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    StateTransitionThrottleConfig anyTypeClusterThrottle =
+        new 
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1);
+    
clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of(anyTypeClusterThrottle));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    DelayedTransition.setDelay(20);
+    DelayedTransition.enableThrottleRecord();
+
+    List<MockParticipantManager> newNodes =
+        Arrays.asList(_participants).subList(NODE_NR - 3, NODE_NR);
+    newNodes.forEach(MockParticipantManager::syncStart);
+    newNodes.forEach(node -> {
+      try {
+        Assert.assertTrue(TestHelper.verify(() -> 
getMaxParallelTransitionCount(
+            DelayedTransition.getInstancePatitionTransitionTimes(), 
node.getInstanceName()) <= 1,
+            1000 * 2));
+      } catch (Exception e) {
+        e.printStackTrace();
+        assert false;
+      }
+    });
+
+    ClusterLiveNodesVerifier liveNodesVerifier =
+        new ClusterLiveNodesVerifier(_gZkClient, CLUSTER_NAME,
+            Lists.transform(Arrays.asList(_participants), 
MockParticipantManager::getInstanceName));
+    Assert.assertTrue(liveNodesVerifier.verifyByZkCallback(1000));
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].syncStop();
     }
   }
 
@@ -225,11 +289,19 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
     _dbs.clear();
     Thread.sleep(50);
 
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, new 
ZkBaseDataAccessor<>(_gZkClient));
     for (int i = 0; i < _participants.length; i++) {
       _participants[i].syncStop();
       _participants[i] =
           new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
_participants[i].getInstanceName());
     }
+    try {
+      Assert.assertTrue(TestHelper.verify(() -> 
dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()).isEmpty(),
 1000));
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.out.println("There're live instances not cleaned up yet");
+      assert false;
+    }
     DelayedTransition.clearThrottleRecord();
   }
 
@@ -279,13 +351,16 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _dbs) {
-      validateThrottle(DelayedTransition.getResourcePatitionTransitionTimes(), 
db, 2);
+      int maxInParallel =
+          
getMaxParallelTransitionCount(DelayedTransition.getResourcePatitionTransitionTimes(),
 db);
+      System.out.println("MaxInParallel: " + maxInParallel + " 
maxPendingTransition: " + 2);
+      Assert.assertTrue(maxInParallel <= 2, "Throttle condition does not meet 
for " + db);
     }
   }
 
-  private void validateThrottle(
+  private int getMaxParallelTransitionCount(
       Map<String, List<PartitionTransitionTime>> partitionTransitionTimesMap,
-      String throttledItemName, int maxPendingTransition) {
+      String throttledItemName) {
     List<PartitionTransitionTime> pTimeList = 
partitionTransitionTimesMap.get(throttledItemName);
 
     Map<Long, List<PartitionTransitionTime>> startMap = new HashMap<>();
@@ -294,7 +369,7 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
 
     if (pTimeList == null) {
       System.out.println("no throttle result for :" + throttledItemName);
-      return;
+      return -1;
     }
     pTimeList.sort((o1, o2) -> (int) (o1.start - o2.start));
 
@@ -330,10 +405,9 @@ public class TestPartitionMovementThrottle extends 
ZkStandAloneCMTestBase {
       }
     }
 
-    System.out.println(
-        "MaxInParallel: " + maxInParallel + " maxPendingTransition: " + 
maxPendingTransition);
-    Assert.assertTrue(maxInParallel <= maxPendingTransition,
-        "Throttle condition does not meet for " + throttledItemName);
+    System.out
+        .println("Max number of ST in parallel: " + maxInParallel + " for " + 
throttledItemName);
+    return maxInParallel;
   }
 
   private int size(List<PartitionTransitionTime> timeList) {

Reply via email to