This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch replica_level_throttle in repository https://gitbox.apache.org/repos/asf/helix.git
commit 22f32b9744c425fdd343361ba76dc28e5243b82f Author: Junkai Xue <[email protected]> AuthorDate: Fri Apr 23 14:58:48 2021 -0700 [Replica Level Throttle] Add per replica rebalance type compute logic (#1703) * Add per replica rebalance type compute logic Three functions added: 1) rebalance type computation required state. 2) rebalance type per message 3) message sorting rules and comparators to determine which message to apply first. --- .../stages/IntermediateStateCalcStage.java | 82 +++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index c0defa9..a840d5e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; @@ -43,6 +45,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.MaintenanceSignal; +import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.StateModelDefinition; @@ -796,6 +799,54 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } /** + * Determine the message rebalance type with message and current states. + * @param desiredStates Ideally how may states we needed for guarantee the health of replica + * @param message The message to be determined what is the rebalance type + * @param derivedCurrentStates Derived from current states with previous messages not be throttled. + * @return Rebalance type. Recovery or load. + */ + private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> desiredStates, Message message, + Map<String, String> derivedCurrentStates) { + Map<String, Integer> desiredStatesSnapshot = new HashMap<>(desiredStates); + // Looping existing current states to see whether current states fulfilled all the required states. + for (String state : derivedCurrentStates.values()) { + if (desiredStatesSnapshot.containsKey(state)) { + if (desiredStatesSnapshot.get(state) == 1) { + desiredStatesSnapshot.remove(state); + } else { + desiredStatesSnapshot.put(state, desiredStatesSnapshot.get(state) - 1); + } + } + } + + // If the message contains any "required" state changes, then it is considered recovery rebalance. + // Otherwise, it is load balance. + return desiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE + : RebalanceType.LOAD_BALANCE; + } + + private Map<String, Integer> getRequiredStates(String resourceName, + ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList) { + + // Prepare required inputs: 1) Priority State List 2) required number of replica + IdealState idealState = resourceControllerDataProvider.getIdealState(resourceName); + StateModelDefinition stateModelDefinition = + resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef()); + int requiredNumReplica = idealState.getMinActiveReplicas() == -1 + ? idealState.getReplicaCount(preferenceList.size()) + : idealState.getMinActiveReplicas(); + + // Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition + // preference list + LinkedHashMap<String, Integer> expectedStateCountMap = stateModelDefinition.getStateCountMap( + (int) preferenceList.stream() + .filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i)) + .count(), requiredNumReplica); // StateModelDefinition's counts + + return expectedStateCountMap; + } + + /** * Log rebalancer metadata for debugging purposes. * @param resource * @param allPartitions @@ -874,7 +925,36 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } } - // Compare partitions according following standard: + private class MessagePriorityComparator implements Comparator<Message> { + private Map<String, Integer> _preferenceInstanceMap; + private Map<String, Integer> _statePriorityMap; + + MessagePriorityComparator(List<String> preferenceList, Map<String, Integer> statePriorityMap) { + // Get instance -> priority map. + _preferenceInstanceMap = IntStream.range(0, preferenceList.size()) + .boxed() + .collect(Collectors.toMap(preferenceList::get, index -> index)); + _statePriorityMap = statePriorityMap; + } + + @Override + public int compare(Message m1, Message m2) { + // Compare rules: + // 1. Higher target state has higher priority. + // 2. If target state is same, range it as preference list order. + // 3. Sort by the name of targeted instances just for deterministic ordering. + if (m1.getToState().equals(m2.getToState()) && _preferenceInstanceMap.containsKey(m1.getTgtName()) + && _preferenceInstanceMap.containsKey(m2.getTgtName())) { + return _preferenceInstanceMap.get(m1.getTgtName()).compareTo(_preferenceInstanceMap.get(m2.getTgtName())); + } + if (!m1.getToState().equals(m2.getToState())) { + return _statePriorityMap.get(m1.getToState()).compareTo(_statePriorityMap.get(m2.getToState())); + } + return m1.getTgtName().compareTo(m2.getTgtName()); + } + } + + // Compare partitions according following standard: // 1) Partition without top state always is the highest priority. // 2) For partition with top-state, the more number of active replica it has, the less priority. private class PartitionPriorityComparator implements Comparator<Partition> {
