This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch replica_level_throttle in repository https://gitbox.apache.org/repos/asf/helix.git
commit ad02d8d650065ea895919d3adc8611ae68ead82b Author: Junkai Xue <[email protected]> AuthorDate: Wed Apr 28 13:20:09 2021 -0700 Change throttling logic to per message (#1714) Apply the logic for throttling with per message quota charge. --- .../stages/IntermediateStateCalcStage.java | 115 ++++++--------------- 1 file changed, 33 insertions(+), 82 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index a840d5e..c46a6ce 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -631,104 +631,55 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } /** - * Check the status on throttling at every level (cluster, resource, instance) and set - * intermediatePartitionStateMap accordingly per partition. - * @param throttleController - * @param resourceName - * @param partition - * @param currentStateOutput - * @param bestPossiblePartitionStateMap - * @param partitionsThrottled - * @param intermediatePartitionStateMap - * @param rebalanceType - * @param cache + * Check the status for a single message on throttling at every level (cluster, resource, replica) and set + * intermediatePartitionStateMap accordingly for that replica. + * @param throttleController throttle controller object for throttling quota + * @param resourceName the resource for throttling check + * @param partition the partition for throttling check + * @param messageToThrottle the message to be throttled + * @param messagesThrottled the cumulative set of messages that have been throttled already. These + * messages represent the replicas of this partition that have been throttled. + * @param intermediatePartitionStateMap the cumulative partition-state mapping as a result of the throttling step + * of IntermediateStateCalcStage + * @param rebalanceType the rebalance type to charge quota + * @param cache cached cluster metadata required by the throttle controller */ - private void throttleStateTransitionsForPartition( - StateTransitionThrottleController throttleController, String resourceName, - Partition partition, CurrentStateOutput currentStateOutput, - PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled, + private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController, + String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled, PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType, ResourceControllerDataProvider cache) { - - Map<String, String> currentStateMap = - currentStateOutput.getCurrentStateMap(resourceName, partition); - Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition); - Set<String> allInstances = new HashSet<>(currentStateMap.keySet()); - allInstances.addAll(bestPossibleMap.keySet()); - Map<String, String> intermediateMap = new HashMap<>(); - boolean hasReachedThrottlingLimit = false; if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) { hasReachedThrottlingLimit = true; if (logger.isDebugEnabled()) { - LogUtil.logDebug(logger, _eventId, - String.format("Throttled on partition: %s in resource: %s", - partition.getPartitionName(), resourceName)); + LogUtil.logDebug(logger, _eventId, String.format( + "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}", + messageToThrottle.getId(), partition.getPartitionName(), resourceName)); } } else { - // throttle if any of the instances are not able to accept state transitions - for (String instance : allInstances) { - String currentState = currentStateMap.get(instance); - String bestPossibleState = bestPossibleMap.get(instance); - if (bestPossibleState != null && !bestPossibleState.equals(currentState) - && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) - .contains(instance)) { - if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) { - hasReachedThrottlingLimit = true; - if (logger.isDebugEnabled()) { - LogUtil.logDebug(logger, _eventId, - String.format( - "Throttled because of instance: %s for partition: %s in resource: %s", - instance, partition.getPartitionName(), resourceName)); - } - break; + // Since message already generated, we can assume the current state is not null and target state is not null + if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) + .contains(messageToThrottle.getTgtName())) { + if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) { + hasReachedThrottlingLimit = true; + if (logger.isDebugEnabled()) { + LogUtil.logDebug(logger, _eventId, String.format( + "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}", + messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName)); } } } } + // If there is still room for this replica, proceed to charge at the cluster and resource level and set the + // intermediate partition-state mapping so that the state transition message can move forward. if (!hasReachedThrottlingLimit) { - // This implies that there is room for more state transitions. - // Find instances with a replica whose current state is different from BestPossibleState and - // "charge" for it, and bestPossibleStates will become intermediate states - intermediateMap.putAll(bestPossibleMap); - boolean shouldChargeForPartition = false; - for (String instance : allInstances) { - String currentState = currentStateMap.get(instance); - String bestPossibleState = bestPossibleMap.get(instance); - if (bestPossibleState != null && !bestPossibleState.equals(currentState) - && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) - .contains(instance)) { - throttleController.chargeInstance(rebalanceType, instance); - shouldChargeForPartition = true; - } - } - if (shouldChargeForPartition) { - throttleController.chargeCluster(rebalanceType); - throttleController.chargeResource(rebalanceType, resourceName); - } + throttleController.chargeCluster(rebalanceType); + throttleController.chargeResource(rebalanceType, resourceName); + intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState()); } else { - // No more room for more state transitions; current states will just become intermediate - // states unless the partition is disabled - // Add this partition to a set of throttled partitions - for (String instance : allInstances) { - String currentState = currentStateMap.get(instance); - String bestPossibleState = bestPossibleMap.get(instance); - if (bestPossibleState != null && !bestPossibleState.equals(currentState) - && cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) - .contains(instance)) { - // Because this partition is disabled, we allow assignment - intermediateMap.put(instance, bestPossibleState); - } else { - // This partition is not disabled, so it must be throttled by just passing on the current - // state - if (currentState != null) { - intermediateMap.put(instance, currentState); - } - partitionsThrottled.add(partition); - } - } + // Intermediate Map is based on current state + messagesThrottled.add(messageToThrottle); } - intermediatePartitionStateMap.setState(partition, intermediateMap); } /**
