This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit ddfc933e1469906d49fcccb08a66352d4b77ccd0 Author: Hunter Lee <[email protected]> AuthorDate: Thu Mar 28 12:29:16 2019 -0700 HELIX: Bypass throttling for disabled partitions This diff allows all state transitions linked to disabled instances/partitions to bypass throttling constraints. Changelist: 1. Modify logic in IntermediateStateCalcStage 2. Add more integration tests --- .../stages/IntermediateStateCalcStage.java | 80 ++++++++---- .../TestNoThrottleDisabledPartitions.java | 144 ++++++++++++++++++++- 2 files changed, 199 insertions(+), 25 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index f021f66..888bd12 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -362,13 +362,13 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } chargePendingTransition(resource, currentStateOutput, throttleController, - partitionsNeedRecovery, partitionsNeedLoadBalance); + partitionsNeedRecovery, partitionsNeedLoadBalance, cache); // Perform recovery balance Set<Partition> recoveryThrottledPartitions = recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput, - cache.getStateModelDef(resource.getStateModelDefRef()).getTopState()); + cache.getStateModelDef(resource.getStateModelDefRef()).getTopState(), cache); // Perform load balance upon checking conditions below Set<Partition> loadbalanceThrottledPartitions; @@ -398,7 +398,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput, bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap, partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName), - onlyDownwardLoadBalance, stateModelDef); + onlyDownwardLoadBalance, stateModelDef, cache); if (clusterStatusMonitor != null) { clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(), @@ -461,7 +461,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { */ private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput, StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery, - Set<Partition> partitionsNeedLoadbalance) { + Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache) { String resourceName = resource.getResourceName(); // check and charge pending transitions @@ -481,17 +481,22 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } if (pendingMap.size() > 0) { - throttleController.chargeCluster(rebalanceType); - throttleController.chargeResource(rebalanceType, resourceName); - - // charge each instance. + boolean shouldChargePartition = false; for (String instance : pendingMap.keySet()) { String currentState = currentStateMap.get(instance); String pendingState = pendingMap.get(instance); - if (pendingState != null && !pendingState.equals(currentState)) { + if (pendingState != null && !pendingState.equals(currentState) + && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) + .contains(instance)) { + // Only charge this instance if the partition is not disabled throttleController.chargeInstance(rebalanceType, instance); + shouldChargePartition = true; } } + if (shouldChargePartition) { + throttleController.chargeCluster(rebalanceType); + throttleController.chargeResource(rebalanceType, resourceName); + } } } } @@ -508,13 +513,15 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param partitionsNeedRecovery * @param currentStateOutput * @param topState + * @param cache * @return a set of partitions that need recovery but did not get recovered due to throttling */ private Set<Partition> recoveryRebalance(Resource resource, PartitionStateMap bestPossiblePartitionStateMap, StateTransitionThrottleController throttleController, PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedRecovery, - CurrentStateOutput currentStateOutput, String topState) { + CurrentStateOutput currentStateOutput, String topState, + ResourceControllerDataProvider cache) { String resourceName = resource.getResourceName(); Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>(); @@ -540,7 +547,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { for (Partition partition : partitionsNeedRecoveryPrioritized) { throttleStateTransitionsForPartition(throttleController, resourceName, partition, currentStateOutput, bestPossiblePartitionStateMap, partitionRecoveryBalanceThrottled, - intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE); + intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, cache); } LogUtil.logInfo(logger, _eventId, String.format( "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing recovery" @@ -563,6 +570,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param currentStateMap * @param onlyDownwardLoadBalance true when only allowing downward transitions * @param stateModelDef for determining whether a partition's transitions are strictly downward + * @param cache * @return */ private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput, @@ -570,7 +578,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { StateTransitionThrottleController throttleController, PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance, Map<Partition, Map<String, String>> currentStateMap, boolean onlyDownwardLoadBalance, - StateModelDefinition stateModelDef) { + StateModelDefinition stateModelDef, ResourceControllerDataProvider cache) { String resourceName = resource.getResourceName(); Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>(); @@ -608,7 +616,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } throttleStateTransitionsForPartition(throttleController, resourceName, partition, currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled, - intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE); + intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache); } LogUtil.logInfo(logger, _eventId, String.format( "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing" @@ -628,12 +636,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param partitionsThrottled * @param intermediatePartitionStateMap * @param rebalanceType + * @param cache */ private void throttleStateTransitionsForPartition( StateTransitionThrottleController throttleController, String resourceName, Partition partition, CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled, - PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType) { + PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType, + ResourceControllerDataProvider cache) { Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition); @@ -655,7 +665,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { for (String instance : allInstances) { String currentState = currentStateMap.get(instance); String bestPossibleState = bestPossibleMap.get(instance); - if (bestPossibleState != null && !bestPossibleState.equals(currentState)) { + if (bestPossibleState != null && !bestPossibleState.equals(currentState) + && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) + .contains(instance)) { if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) { hasReachedThrottlingLimit = true; if (logger.isDebugEnabled()) { @@ -669,24 +681,46 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } } if (!hasReachedThrottlingLimit) { - // This implies that there is room for more pending states. Find - // instances with a replica whose current state is different from BestPossibleState and + // This implies that there is room for more state transitions. + // Find instances with a replica whose current state is different from BestPossibleState and // "charge" for it, and bestPossibleStates will become intermediate states intermediateMap.putAll(bestPossibleMap); + boolean shouldChargeForPartition = false; for (String instance : allInstances) { String currentState = currentStateMap.get(instance); String bestPossibleState = bestPossibleMap.get(instance); - if (bestPossibleState != null && !bestPossibleState.equals(currentState)) { + if (bestPossibleState != null && !bestPossibleState.equals(currentState) + && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) + .contains(instance)) { throttleController.chargeInstance(rebalanceType, instance); + shouldChargeForPartition = true; } } - throttleController.chargeCluster(rebalanceType); - throttleController.chargeResource(rebalanceType, resourceName); + if (shouldChargeForPartition) { + throttleController.chargeCluster(rebalanceType); + throttleController.chargeResource(rebalanceType, resourceName); + } } else { - // No more room for more pending states; current states will just become intermediate states + // No more room for more state transitions; current states will just become intermediate + // states unless the partition is disabled // Add this partition to a set of throttled partitions - intermediateMap.putAll(currentStateMap); - partitionsThrottled.add(partition); + for (String instance : allInstances) { + String currentState = currentStateMap.get(instance); + String bestPossibleState = bestPossibleMap.get(instance); + if (bestPossibleState != null && !bestPossibleState.equals(currentState) + && cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) + .contains(instance)) { + // Because this partition is disabled, we allow assignment + intermediateMap.put(instance, bestPossibleState); + } else { + // This partition is not disabled, so it must be throttled by just passing on the current + // state + if (currentState != null) { + intermediateMap.put(instance, currentState); + } + partitionsThrottled.add(partition); + } + } } intermediatePartitionStateMap.setState(partition, intermediateMap); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java index 73bdcf8..fa0fe6e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java @@ -37,6 +37,7 @@ import org.apache.helix.model.CurrentState; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.builder.FullAutoModeISBuilder; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.testng.Assert; import org.testng.annotations.Test; @@ -169,7 +170,8 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { * and no throttle config set for recovery balance * and throttle config of 1 set for load balance, * Instead of disabling the instance, we disable the partition in the instance config. - * Here, we set the recovery balance config. Then we should still see the MASTER. + * Here, we set the recovery balance config to 0. But we should still see the downward transition + * regardless. * * instance 1 : S (M->S->Offline) * * instance 2 : M (S->M because it's in recovery) * * instance 3 : S @@ -205,7 +207,7 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap() .entrySet()) { if (partitionState.getKey().equals("TestDB0_2")) { - Assert.assertTrue(partitionState.getValue().equals("MASTER")); + Assert.assertFalse(partitionState.getValue().equals("MASTER")); } } } @@ -219,6 +221,85 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { deleteCluster(_clusterName); } + @Test + public void testNoThrottleOnDisabledInstance() throws Exception { + int participantCount = 5; + setupEnvironment(participantCount); + setThrottleConfig(); + + // Disable an instance so that it will not be subject to throttling + PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName()); + InstanceConfig instanceConfig = _accessor.getProperty(key); + instanceConfig.setInstanceEnabled(false); + _accessor.setProperty(key, instanceConfig); + + // Set the state transition delay so that transitions would be processed slowly + DelayedTransitionBase.setDelay(1000000L); + + // Resume the controller + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0"); + controller.syncStart(); + Thread.sleep(500L); + + // Check that there are more messages on this Participant despite the throttle config set at 1 + Assert.assertTrue(verifyMultipleMessages(_participants[0])); + + // clean up the cluster + controller.syncStop(); + for (int i = 0; i < participantCount; i++) { + _participants[i].syncStop(); + } + deleteCluster(_clusterName); + } + + @Test + public void testNoThrottleOnDisabledPartition() throws Exception { + int participantCount = 3; + setupEnvironment(participantCount); + setThrottleConfig(); + + // Disable a partition so that it will not be subject to throttling + String partitionName = _resourceName + "0_0"; + for (int i = 0; i < participantCount; i++) { + disablePartitionOnInstance(_participants[i], _resourceName + "0", partitionName); + } + + String newResource = "abc"; + IdealState idealState = new FullAutoModeISBuilder(newResource).setStateModel("MasterSlave") + .setStateModelFactoryName("DEFAULT").setNumPartitions(5).setNumReplica(3) + .setMinActiveReplica(2).setRebalancerMode(IdealState.RebalanceMode.FULL_AUTO) + .setRebalancerClass("org.apache.helix.controller.rebalancer.DelayedAutoRebalancer") + .setRebalanceStrategy( + "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy") + .build(); + + _gSetupTool.addResourceToCluster(_clusterName, newResource, idealState); + _gSetupTool.rebalanceStorageCluster(_clusterName, newResource, 3); + + // Set the state transition delay so that transitions would be processed slowly + DelayedTransitionBase.setDelay(1000000L); + + // Now Helix will try to bring this up on all instances. But the disabled partition will go to + // offline. This should allow each instance to have 2 messages despite having the throttle set + // at 1 + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0"); + controller.syncStart(); + Thread.sleep(500L); + + for (MockParticipantManager participantManager : _participants) { + Assert.assertTrue(verifyTwoMessages(participantManager)); + } + + // clean up the cluster + controller.syncStop(); + for (int i = 0; i < participantCount; i++) { + _participants[i].syncStop(); + } + deleteCluster(_clusterName); + } + /** * Set up the cluster and pause the controller. * @param participantCount @@ -253,6 +334,37 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { } /** + * Set all throttle configs at 1 so that we could test by observing the number of ongoing + * transitions. + */ + private void setThrottleConfig() { + PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); + + ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig()); + clusterConfig.setResourcePriorityField("Name"); + List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>(); + + // Add throttling at cluster-level + throttleConfigs.add(new StateTransitionThrottleConfig( + StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1)); + throttleConfigs.add( + new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1)); + throttleConfigs + .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1)); + + // Add throttling at instance level + throttleConfigs + .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY, + StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 1)); + + clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs); + _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig); + } + + /** * Set throttle limits only for load balance so that none of them would happen. */ private void setThrottleConfigForLoadBalance() { @@ -347,4 +459,32 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { instanceConfig.setInstanceEnabledForPartition(resourceName, partitionName, false); _accessor.setProperty(key, instanceConfig); } + + /** + * Ensure that there are more than 1 message for a given Participant. + * @param participant + * @return + */ + private boolean verifyMultipleMessages(final MockParticipantManager participant) { + PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName()); + List<String> messageNames = _accessor.getChildNames(key); + if (messageNames != null) { + return messageNames.size() > 1; + } + return false; + } + + /** + * Ensure that there are 2 messages for a given Participant. + * @param participant + * @return + */ + private boolean verifyTwoMessages(final MockParticipantManager participant) { + PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName()); + List<String> messageNames = _accessor.getChildNames(key); + if (messageNames != null) { + return messageNames.size() == 2; + } + return false; + } }
