This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new 9ee47c7 Fix issue when client only sets ANY at cluster level throttle config 9ee47c7 is described below commit 9ee47c7d22ca57c376764590e253cbeffaaa17c8 Author: Yi Wang <i3.wan...@gmail.com> AuthorDate: Thu Aug 8 17:34:47 2019 -0700 Fix issue when client only sets ANY at cluster level throttle config fixes #332 Added unit test for StateTransitionThrottleController Added integration test for verifying case when only cluster level ANY throttle set to 1# Please enter the commit message for your changes. Lines starting --- .../stages/StateTransitionThrottleController.java | 65 +++---- .../TestStateTransitionThrottleController.java | 194 +++++++++++++++++++++ .../integration/TestPartitionMovementThrottle.java | 100 +++++++++-- 3 files changed, 302 insertions(+), 57 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java index ecfe256..e42354c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java @@ -38,15 +38,15 @@ class StateTransitionThrottleController { private static Logger logger = LoggerFactory.getLogger(StateTransitionThrottleController.class); // pending allowed transition counts in the cluster level for recovery and load balance - private Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster; + Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster; // pending allowed transition counts for each instance and resource - private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerInstance; - private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerResource; + Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerInstance; + Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerResource; private boolean _throttleEnabled = false; - public StateTransitionThrottleController(Set<String> resources, ClusterConfig clusterConfig, + StateTransitionThrottleController(Set<String> resources, ClusterConfig clusterConfig, Set<String> liveInstances) { super(); _pendingTransitionAllowedInCluster = new HashMap<>(); @@ -175,13 +175,7 @@ class StateTransitionThrottleController { * @param rebalanceType */ protected void chargeCluster(StateTransitionThrottleConfig.RebalanceType rebalanceType) { - if (_pendingTransitionAllowedInCluster.containsKey(rebalanceType)) { - Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType); - chargeANYType(_pendingTransitionAllowedInCluster); - if (clusterThrottle > 0) { - _pendingTransitionAllowedInCluster.put(rebalanceType, clusterThrottle - 1); - } - } + charge(rebalanceType, _pendingTransitionAllowedInCluster); } /** @@ -191,14 +185,7 @@ class StateTransitionThrottleController { */ protected void chargeResource(StateTransitionThrottleConfig.RebalanceType rebalanceType, String resource) { - if (_pendingTransitionAllowedPerResource.containsKey(resource) - && _pendingTransitionAllowedPerResource.get(resource).containsKey(rebalanceType)) { - chargeANYType(_pendingTransitionAllowedPerResource.get(resource)); - Long resourceThrottle = _pendingTransitionAllowedPerResource.get(resource).get(rebalanceType); - if (resourceThrottle > 0) { - _pendingTransitionAllowedPerResource.get(resource).put(rebalanceType, resourceThrottle - 1); - } - } + charge(rebalanceType, _pendingTransitionAllowedPerResource.getOrDefault(resource, new HashMap<>())); } /** @@ -208,13 +195,21 @@ class StateTransitionThrottleController { */ protected void chargeInstance(StateTransitionThrottleConfig.RebalanceType rebalanceType, String instance) { - if (_pendingTransitionAllowedPerInstance.containsKey(instance) - && _pendingTransitionAllowedPerInstance.get(instance).containsKey(rebalanceType)) { - chargeANYType(_pendingTransitionAllowedPerInstance.get(instance)); - Long instanceThrottle = _pendingTransitionAllowedPerInstance.get(instance).get(rebalanceType); - if (instanceThrottle > 0) { - _pendingTransitionAllowedPerInstance.get(instance).put(rebalanceType, instanceThrottle - 1); - } + charge(rebalanceType, _pendingTransitionAllowedPerInstance.getOrDefault(instance, new HashMap<>())); + } + + private void charge(StateTransitionThrottleConfig.RebalanceType rebalanceType, + Map<StateTransitionThrottleConfig.RebalanceType, Long> quota) { + if (StateTransitionThrottleConfig.RebalanceType.NONE.equals(rebalanceType)) { + logger.error("Wrong rebalance type NONE as parameter"); + return; + } + // if ANY type is present, decrement one else do nothing + quota.computeIfPresent(StateTransitionThrottleConfig.RebalanceType.ANY, + (type, quotaNumber) -> Math.max(0, quotaNumber - 1)); + if (!rebalanceType.equals(StateTransitionThrottleConfig.RebalanceType.ANY)) { + // if type is present, decrement one else do nothing + quota.computeIfPresent(rebalanceType, (type, quotaNumber) -> Math.max(0, quotaNumber - 1)); } } @@ -236,22 +231,4 @@ class StateTransitionThrottleController { } return false; } - - /** - * "Charge" for a pending state regardless of the rebalance type by subtracting one pending state - * from number of total pending state from number of total pending states allowed (set by user - * application). - * @param pendingTransitionAllowed - */ - private void chargeANYType( - Map<StateTransitionThrottleConfig.RebalanceType, Long> pendingTransitionAllowed) { - if (pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY)) { - Long anyTypeThrottle = - pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY); - if (anyTypeThrottle > 0) { - pendingTransitionAllowed.put(StateTransitionThrottleConfig.RebalanceType.ANY, - anyTypeThrottle - 1); - } - } - } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionThrottleController.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionThrottleController.java new file mode 100644 index 0000000..182c428 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionThrottleController.java @@ -0,0 +1,194 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType.ANY; +import static org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE; +import static org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.helix.api.config.StateTransitionThrottleConfig; +import org.apache.helix.model.ClusterConfig; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class TestStateTransitionThrottleController { + private static final String INSTANCE = "instance0"; + private static final String RESOURCE = "db0"; + private static final List<StateTransitionThrottleConfig.RebalanceType> VALID_REBALANCE_TYPES = + ImmutableList.of(LOAD_BALANCE, RECOVERY_BALANCE, ANY); + + @Test(description = "When cluster level ANY throttle config is set") + public void testChargeClusterWhenANYClusterLevelThrottleConfig() { + int maxNumberOfST = 1; + ClusterConfig clusterConfig = new ClusterConfig("config"); + clusterConfig + .setStateTransitionThrottleConfigs(ImmutableList.of(new StateTransitionThrottleConfig(ANY, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxNumberOfST))); + + StateTransitionThrottleControllerAccessor controller = + new StateTransitionThrottleControllerAccessor(RESOURCE, INSTANCE, clusterConfig); + Assert.assertTrue(controller.isThrottleEnabled()); + + for (StateTransitionThrottleConfig.RebalanceType rebalanceType : VALID_REBALANCE_TYPES) { + controller.chargeCluster(rebalanceType); + for (StateTransitionThrottleConfig.RebalanceType type : VALID_REBALANCE_TYPES) { + Assert.assertTrue(controller.shouldThrottleForCluster(type)); + Assert.assertTrue(controller.shouldThrottleForInstance(type, INSTANCE)); + Assert.assertTrue(controller.shouldThrottleForInstance(type, RESOURCE)); + } + // reset controller + controller = new StateTransitionThrottleControllerAccessor(RESOURCE, INSTANCE, clusterConfig); + } + } + + @Test(description = "When cluster throttle is config of LOAD_BALANCE/RECOVERY_BALANCE, no ANY type") + public void testChargeCluster_OnlySetClusterSpecificType() { + int maxNumberOfST = 1; + ClusterConfig clusterConfig = new ClusterConfig("config"); + clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of( + new StateTransitionThrottleConfig(RECOVERY_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxNumberOfST), + new StateTransitionThrottleConfig(LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxNumberOfST))); + + StateTransitionThrottleControllerAccessor controller = + new StateTransitionThrottleControllerAccessor(RESOURCE, INSTANCE, clusterConfig); + + Assert.assertTrue(controller.isThrottleEnabled()); + + controller.chargeCluster(ANY); + Assert.assertEquals(controller.getClusterLevelQuota(RECOVERY_BALANCE), 1); + Assert.assertEquals(controller.getClusterLevelQuota(LOAD_BALANCE), 1); + Assert.assertEquals(controller.getClusterLevelQuota(ANY), 0); + + VALID_REBALANCE_TYPES.forEach(controller::chargeCluster); + for (StateTransitionThrottleConfig.RebalanceType rebalanceType : ImmutableList.of(LOAD_BALANCE, + RECOVERY_BALANCE)) { + Assert.assertTrue(controller.shouldThrottleForCluster(rebalanceType)); + Assert.assertTrue(controller.shouldThrottleForInstance(rebalanceType, INSTANCE)); + Assert.assertTrue(controller.shouldThrottleForResource(rebalanceType, RESOURCE)); + } + } + + @DataProvider + public static Object[][] mixedConfigurations() { + // TODO: add more mixed configuration setting when refactoring the controller logic + return new Object[][] { + { + 10, 9, 8, 7, 6, 5, 4, 3, 2 + } + }; + } + + @Test(dataProvider = "mixedConfigurations") + public void testChargeClusterWithMixedThrottleConfig(int anyClusterLevelQuota, + int loadClusterLevelQuota, int recoveryClusterLevelQuota, int anyInstanceLevelQuota, + int loadInstanceLevelQuota, int recoveryInstanceLevelQuota, int anyResourceLevelQuota, + int loadResourceLevelQuota, int recoveryResourceLevelQuota) { + List<StateTransitionThrottleConfig> stateTransitionThrottleConfigs = Arrays.asList( + new StateTransitionThrottleConfig(ANY, StateTransitionThrottleConfig.ThrottleScope.CLUSTER, + anyClusterLevelQuota), + new StateTransitionThrottleConfig(RECOVERY_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, recoveryClusterLevelQuota), + new StateTransitionThrottleConfig(LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, loadClusterLevelQuota), + new StateTransitionThrottleConfig(ANY, StateTransitionThrottleConfig.ThrottleScope.INSTANCE, + anyInstanceLevelQuota), + new StateTransitionThrottleConfig(RECOVERY_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.INSTANCE, recoveryInstanceLevelQuota), + new StateTransitionThrottleConfig(LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.INSTANCE, loadInstanceLevelQuota), + new StateTransitionThrottleConfig(ANY, StateTransitionThrottleConfig.ThrottleScope.RESOURCE, + anyResourceLevelQuota), + new StateTransitionThrottleConfig(RECOVERY_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.RESOURCE, recoveryResourceLevelQuota), + new StateTransitionThrottleConfig(LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.RESOURCE, loadResourceLevelQuota)); + ClusterConfig clusterConfig = new ClusterConfig("config"); + clusterConfig.setStateTransitionThrottleConfigs(stateTransitionThrottleConfigs); + + StateTransitionThrottleControllerAccessor controller = + new StateTransitionThrottleControllerAccessor(RESOURCE, INSTANCE, clusterConfig); + + Assert.assertTrue(controller.isThrottleEnabled()); + + // verify behavior after charging cluster + controller.chargeCluster(ANY); + Assert.assertEquals(controller.getClusterLevelQuota(ANY), anyClusterLevelQuota - 1); + controller.chargeCluster(RECOVERY_BALANCE); + Assert.assertEquals(controller.getClusterLevelQuota(RECOVERY_BALANCE), + recoveryClusterLevelQuota - 1); + controller.chargeCluster(LOAD_BALANCE); + Assert.assertEquals(controller.getClusterLevelQuota(LOAD_BALANCE), loadClusterLevelQuota - 1); + + // verify behavior after charging instance + controller.chargeInstance(ANY, INSTANCE); + Assert.assertEquals(controller.getInstanceLevelQuota(ANY, INSTANCE), anyInstanceLevelQuota - 1); + controller.chargeInstance(RECOVERY_BALANCE, INSTANCE); + Assert.assertEquals(controller.getInstanceLevelQuota(RECOVERY_BALANCE, INSTANCE), + recoveryInstanceLevelQuota - 1); + controller.chargeInstance(LOAD_BALANCE, INSTANCE); + Assert.assertEquals(controller.getInstanceLevelQuota(LOAD_BALANCE, INSTANCE), + loadInstanceLevelQuota - 1); + + // verify behavior after charging resource + controller.chargeResource(ANY, RESOURCE); + Assert.assertEquals(controller.getResourceLevelQuota(ANY, RESOURCE), anyResourceLevelQuota - 1); + controller.chargeResource(RECOVERY_BALANCE, RESOURCE); + Assert.assertEquals(controller.getResourceLevelQuota(RECOVERY_BALANCE, RESOURCE), + recoveryResourceLevelQuota - 1); + controller.chargeResource(LOAD_BALANCE, RESOURCE); + Assert.assertEquals(controller.getResourceLevelQuota(LOAD_BALANCE, RESOURCE), + loadResourceLevelQuota - 1); + } + + // The inner class just to fetch the protected fields of {@link StateTransitionThrottleController} + private static class StateTransitionThrottleControllerAccessor + extends StateTransitionThrottleController { + StateTransitionThrottleControllerAccessor(String resource, String liveInstance, + ClusterConfig clusterConfig) { + super(ImmutableSet.of(resource), clusterConfig, ImmutableSet.of(liveInstance)); + } + + long getClusterLevelQuota(StateTransitionThrottleConfig.RebalanceType rebalanceType) { + return _pendingTransitionAllowedInCluster.getOrDefault(rebalanceType, 0L); + } + + long getResourceLevelQuota(StateTransitionThrottleConfig.RebalanceType rebalanceType, + String resource) { + return _pendingTransitionAllowedPerResource.getOrDefault(resource, Collections.emptyMap()) + .getOrDefault(rebalanceType, 0L); + } + + long getInstanceLevelQuota(StateTransitionThrottleConfig.RebalanceType rebalanceType, + String instance) { + return _pendingTransitionAllowedPerInstance.getOrDefault(instance, Collections.emptyMap()) + .getOrDefault(rebalanceType, 0L); + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java index 5d1c8ea..36a1ee3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java @@ -29,25 +29,35 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; + import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.NotificationContext; +import org.apache.helix.TestHelper; import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.integration.common.ZkStandAloneCMTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.Message; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ClusterLiveNodesVerifier; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { + private ConfigAccessor _configAccessor; private Set<String> _dbs = new HashSet<>(); @@ -149,7 +159,8 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { // there are recovery or error partitions present, maxPendingTransition below is adjusted from // 2 to 5 because BOTH recovery balance and load balance could happen in the same pipeline // iteration - validateThrottle(DelayedTransition.getResourcePatitionTransitionTimes(), db, 5); + Assert.assertTrue(getMaxParallelTransitionCount( + DelayedTransition.getResourcePatitionTransitionTimes(), db) <= 5); } } @@ -179,8 +190,9 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { Thread.sleep(2000); for (int i = 0; i < NODE_NR; i++) { - validateThrottle(DelayedTransition.getInstancePatitionTransitionTimes(), - _participants[i].getInstanceName(), 2); + Assert.assertTrue( + getMaxParallelTransitionCount(DelayedTransition.getInstancePatitionTransitionTimes(), + _participants[i].getInstanceName()) <= 2); } } @@ -211,8 +223,60 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { Thread.sleep(2000L); for (int i = 0; i < NODE_NR; i++) { - validateThrottle(DelayedTransition.getInstancePatitionTransitionTimes(), - _participants[i].getInstanceName(), 1); + Assert.assertTrue( + getMaxParallelTransitionCount(DelayedTransition.getInstancePatitionTransitionTimes(), + _participants[i].getInstanceName()) <= 1); + } + } + + @Test + public void testThrottleOnlyClusterLevelAnyType() { + // start some participants + for (int i = 0; i < NODE_NR - 3; i++) { + _participants[i].syncStart(); + } + // Add resource: TestDB_ANY of 20 partitions + _gSetupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "_OnlyANY", + 20, STATE_MODEL, RebalanceMode.FULL_AUTO.name()); + // Act the rebalance process + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "_OnlyANY", + _replica); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // overwrite the cluster level throttle configuration + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + StateTransitionThrottleConfig anyTypeClusterThrottle = + new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1); + clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of(anyTypeClusterThrottle)); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + DelayedTransition.setDelay(20); + DelayedTransition.enableThrottleRecord(); + + List<MockParticipantManager> newNodes = + Arrays.asList(_participants).subList(NODE_NR - 3, NODE_NR); + newNodes.forEach(MockParticipantManager::syncStart); + newNodes.forEach(node -> { + try { + Assert.assertTrue(TestHelper.verify(() -> getMaxParallelTransitionCount( + DelayedTransition.getInstancePatitionTransitionTimes(), node.getInstanceName()) <= 1, + 1000 * 2)); + } catch (Exception e) { + e.printStackTrace(); + assert false; + } + }); + + ClusterLiveNodesVerifier liveNodesVerifier = + new ClusterLiveNodesVerifier(_gZkClient, CLUSTER_NAME, + Lists.transform(Arrays.asList(_participants), MockParticipantManager::getInstanceName)); + Assert.assertTrue(liveNodesVerifier.verifyByZkCallback(1000)); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + for (int i = 0; i < NODE_NR; i++) { + _participants[i].syncStop(); } } @@ -225,11 +289,19 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { _dbs.clear(); Thread.sleep(50); + HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<>(_gZkClient)); for (int i = 0; i < _participants.length; i++) { _participants[i].syncStop(); _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[i].getInstanceName()); } + try { + Assert.assertTrue(TestHelper.verify(() -> dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()).isEmpty(), 1000)); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("There're live instances not cleaned up yet"); + assert false; + } DelayedTransition.clearThrottleRecord(); } @@ -279,13 +351,16 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { Assert.assertTrue(_clusterVerifier.verifyByPolling()); for (String db : _dbs) { - validateThrottle(DelayedTransition.getResourcePatitionTransitionTimes(), db, 2); + int maxInParallel = + getMaxParallelTransitionCount(DelayedTransition.getResourcePatitionTransitionTimes(), db); + System.out.println("MaxInParallel: " + maxInParallel + " maxPendingTransition: " + 2); + Assert.assertTrue(maxInParallel <= 2, "Throttle condition does not meet for " + db); } } - private void validateThrottle( + private int getMaxParallelTransitionCount( Map<String, List<PartitionTransitionTime>> partitionTransitionTimesMap, - String throttledItemName, int maxPendingTransition) { + String throttledItemName) { List<PartitionTransitionTime> pTimeList = partitionTransitionTimesMap.get(throttledItemName); Map<Long, List<PartitionTransitionTime>> startMap = new HashMap<>(); @@ -294,7 +369,7 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { if (pTimeList == null) { System.out.println("no throttle result for :" + throttledItemName); - return; + return -1; } pTimeList.sort((o1, o2) -> (int) (o1.start - o2.start)); @@ -330,10 +405,9 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { } } - System.out.println( - "MaxInParallel: " + maxInParallel + " maxPendingTransition: " + maxPendingTransition); - Assert.assertTrue(maxInParallel <= maxPendingTransition, - "Throttle condition does not meet for " + throttledItemName); + System.out + .println("Max number of ST in parallel: " + maxInParallel + " for " + throttledItemName); + return maxInParallel; } private int size(List<PartitionTransitionTime> timeList) {