This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer2 in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4353f67192adc6c9d6fec1c718f3e1690a60ad0f Author: Hunter Lee <[email protected]> AuthorDate: Fri Oct 4 13:50:43 2019 -0700 Add latency metric components for WAGED rebalancer (#490) Add WAGED rebalancer metric framework and latency metric implementation Changelist: 1. Add WAGED rebalancer metric interface 2. Implement latency-related metrics 3. Integrate latency metrics into WAGED rebalancer 4. Add tests --- .../rebalancer/waged/WagedRebalancer.java | 192 ++++++++++++++------- .../ConstraintBasedAlgorithmFactory.java | 2 +- .../stages/BestPossibleStateCalcStage.java | 44 ++++- .../monitoring/mbeans/MonitorDomainNames.java | 3 +- .../helix/monitoring/metrics/MetricCollector.java | 100 +++++++++++ .../metrics/WagedRebalancerMetricCollector.java | 80 +++++++++ .../implementation/RebalanceLatencyGauge.java | 104 +++++++++++ .../model/CountMetric.java} | 27 ++- .../monitoring/metrics/model/LatencyMetric.java | 52 ++++++ .../model/Metric.java} | 40 ++++- .../rebalancer/waged/TestWagedRebalancer.java | 22 +-- .../waged/TestWagedRebalancerMetrics.java | 132 ++++++++++++++ .../waged/model/AbstractTestClusterModel.java | 16 +- 13 files changed, 698 insertions(+), 116 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index d211884..53c9840 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -48,6 +48,9 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; +import org.apache.helix.monitoring.metrics.MetricCollector; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,10 +67,8 @@ public class WagedRebalancer { // When any of the following change happens, the rebalancer needs to do a global rebalance which // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - ImmutableSet.of( - HelixConstants.ChangeType.RESOURCE_CONFIG, - HelixConstants.ChangeType.CLUSTER_CONFIG, - HelixConstants.ChangeType.INSTANCE_CONFIG); + ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG, + HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); // The cluster change detector is a stateful object. // Make it static to avoid unnecessary reinitialization. private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL = @@ -76,6 +77,7 @@ public class WagedRebalancer { private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator; private final AssignmentMetadataStore _assignmentMetadataStore; private final RebalanceAlgorithm _rebalanceAlgorithm; + private MetricCollector _metricCollector; private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) { AssignmentMetadataStore assignmentMetadataStore = null; @@ -90,7 +92,8 @@ public class WagedRebalancer { } public WagedRebalancer(HelixManager helixManager, - Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) { + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences, + MetricCollector metricCollector) { this(constructAssignmentStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences), // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output. @@ -99,16 +102,37 @@ public class WagedRebalancer { // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer new DelayedAutoRebalancer(), // Helix Manager is required for the rebalancer scheduler - helixManager); + helixManager, metricCollector); } + /** + * This constructor will use null for HelixManager and MetricCollector. With null HelixManager, + * the rebalancer will rebalance solely based on CurrentStates. With null MetricCollector, the + * rebalancer will not emit JMX metrics. + * @param assignmentMetadataStore + * @param algorithm + * @param mappingCalculator + */ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) { - this(assignmentMetadataStore, algorithm, mappingCalculator, null); + this(assignmentMetadataStore, algorithm, mappingCalculator, null, null); + } + + /** + * This constructor will use null for HelixManager and MetricCollector. With null HelixManager, + * the rebalancer will rebalance solely based on CurrentStates. + * @param assignmentMetadataStore + * @param algorithm + * @param metricCollector + */ + protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, + RebalanceAlgorithm algorithm, MetricCollector metricCollector) { + this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector); } private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, - RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager) { + RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager, + MetricCollector metricCollector) { if (assignmentMetadataStore == null) { LOG.warn("Assignment Metadata Store is not configured properly." + " The rebalancer will not access the assignment store during the rebalance."); @@ -117,6 +141,10 @@ public class WagedRebalancer { _rebalanceAlgorithm = algorithm; _mappingCalculator = mappingCalculator; _manager = manager; + // If metricCollector is null, instantiate a version that does not register metrics in order to + // allow rebalancer to proceed + _metricCollector = + metricCollector == null ? new WagedRebalancerMetricCollector() : metricCollector; } // Release all the resources. @@ -138,8 +166,7 @@ public class WagedRebalancer { Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { if (resourceMap.isEmpty()) { - LOG.warn("There is no resource to be rebalanced by {}", - this.getClass().getSimpleName()); + LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName()); return Collections.emptyMap(); } @@ -169,7 +196,6 @@ public class WagedRebalancer { newStateMap == null ? Collections.emptyMap() : newStateMap); } } - LOG.info("Finish computing new ideal states for resources: {}", resourceMap.keySet().toString()); return newIdealStates; @@ -191,24 +217,25 @@ public class WagedRebalancer { return itemKeys; })); + // Perform Global Baseline Calculation if (clusterChanges.keySet().stream() - .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) { + .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) { refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput); // Inject a cluster config change for large scale partial rebalance once the baseline changed. clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet()); } - Set<String> activeNodes = DelayedRebalanceUtil - .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(), - clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), - clusterData.getInstanceConfigMap(), clusterData.getClusterConfig()); + Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(), + clusterData.getEnabledLiveInstances(), clusterData.getInstanceOfflineTimeMap(), + clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), + clusterData.getClusterConfig()); // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config. delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet()); + // Perform partial rebalance Map<String, ResourceAssignment> newAssignment = - partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, - currentStateOutput); + partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput); // <ResourceName, <State, Priority>> Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>(); @@ -238,16 +265,15 @@ public class WagedRebalancer { // some delayed rebalanced assignments. if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) { applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges, - resourceStatePriorityMap, - getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, - resourceMap.keySet())); + resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore, + currentStateOutput, resourceMap.keySet())); } // Replace the assignment if user-defined preference list is configured. // Note the user-defined list is intentionally applied to the final mapping after calculation. // This is to avoid persisting it into the assignment store, which impacts the long term // assignment evenness and partition movements. - finalIdealStateMap.entrySet().stream().forEach( - idealStateEntry -> applyUserDefinedPreferenceList( + finalIdealStateMap.entrySet().stream() + .forEach(idealStateEntry -> applyUserDefinedPreferenceList( clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue())); return finalIdealStateMap; @@ -258,19 +284,31 @@ public class WagedRebalancer { Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { LOG.info("Start calculating the new baseline."); + LatencyMetric globalBaselineCalcLatency = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge + .name(), + LatencyMetric.class); + globalBaselineCalcLatency.startMeasuringLatency(); + // Read the baseline from metadata store Map<String, ResourceAssignment> currentBaseline = getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + // For baseline calculation // 1. Ignore node status (disable/offline). // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for // the baseline. - Map<String, ResourceAssignment> newBaseline = - calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(), - Collections.emptyMap(), currentBaseline); + Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterData, clusterChanges, + resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline); + // Write the new baseline to metadata store if (_assignmentMetadataStore != null) { try { + LatencyMetric writeLatency = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), + LatencyMetric.class); + writeLatency.startMeasuringLatency(); _assignmentMetadataStore.persistBaseline(newBaseline); + writeLatency.endMeasuringLatency(); } catch (Exception ex) { throw new HelixRebalanceException("Failed to persist the new baseline assignment.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); @@ -278,7 +316,7 @@ public class WagedRebalancer { } else { LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment."); } - + globalBaselineCalcLatency.endMeasuringLatency(); LOG.info("Finish calculating the new baseline."); } @@ -288,20 +326,34 @@ public class WagedRebalancer { Set<String> activeNodes, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { LOG.info("Start calculating the new best possible assignment."); + LatencyMetric partialRebalanceLatency = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge + .name(), + LatencyMetric.class); + partialRebalanceLatency.startMeasuringLatency(); + // TODO: Consider combining the metrics for both baseline/best possible? + // Read the baseline from metadata store Map<String, ResourceAssignment> currentBaseline = getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); - Map<String, ResourceAssignment> currentBestPossibleAssignment = - getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, - resourceMap.keySet()); - Map<String, ResourceAssignment> newAssignment = - calculateAssignment(clusterData, clusterChanges, resourceMap, activeNodes, currentBaseline, - currentBestPossibleAssignment); + + // Read the best possible assignment from metadata store + Map<String, ResourceAssignment> currentBestPossibleAssignment = getBestPossibleAssignment( + _assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + + // Compute the new assignment + Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges, + resourceMap, activeNodes, currentBaseline, currentBestPossibleAssignment); if (_assignmentMetadataStore != null) { try { + LatencyMetric writeLatency = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), + LatencyMetric.class); + writeLatency.startMeasuringLatency(); // TODO Test to confirm if persisting the final assignment (with final partition states) // would be a better option. _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment); + writeLatency.endMeasuringLatency(); } catch (Exception ex) { throw new HelixRebalanceException("Failed to persist the new best possible assignment.", HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); @@ -309,20 +361,20 @@ public class WagedRebalancer { } else { LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment."); } - + partialRebalanceLatency.endMeasuringLatency(); LOG.info("Finish calculating the new best possible assignment."); return newAssignment; } /** * Generate the cluster model based on the input and calculate the optimal assignment. - * @param clusterData the cluster data cache. - * @param clusterChanges the detected cluster changes. - * @param resourceMap the rebalancing resources. - * @param activeNodes the alive and enabled nodes. - * @param baseline the baseline assignment for the algorithm as a reference. + * @param clusterData the cluster data cache. + * @param clusterChanges the detected cluster changes. + * @param resourceMap the rebalancing resources. + * @param activeNodes the alive and enabled nodes. + * @param baseline the baseline assignment for the algorithm as a reference. * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a - * reference. + * reference. * @return the new optimal assignment for the resources. */ private Map<String, ResourceAssignment> calculateAssignment( @@ -415,7 +467,7 @@ public class WagedRebalancer { * @param currentStateOutput * @param resources * @return The current baseline assignment. If record does not exist in the - * assignmentMetadataStore, return the current state assignment. + * assignmentMetadataStore, return the current state assignment. * @throws HelixRebalanceException */ private Map<String, ResourceAssignment> getBaselineAssignment( @@ -424,7 +476,12 @@ public class WagedRebalancer { Map<String, ResourceAssignment> currentBaseline = Collections.emptyMap(); if (assignmentMetadataStore != null) { try { + LatencyMetric stateReadLatency = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(), + LatencyMetric.class); + stateReadLatency.startMeasuringLatency(); currentBaseline = assignmentMetadataStore.getBaseline(); + stateReadLatency.endMeasuringLatency(); } catch (HelixException ex) { // Report error. and use empty mapping instead. LOG.error("Failed to get the current baseline assignment.", ex); @@ -435,8 +492,7 @@ public class WagedRebalancer { } } if (currentBaseline.isEmpty()) { - LOG.warn( - "The current baseline assignment record is empty. Use the current states instead."); + LOG.warn("The current baseline assignment record is empty. Use the current states instead."); currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); } return currentBaseline; @@ -447,7 +503,7 @@ public class WagedRebalancer { * @param currentStateOutput * @param resources * @return The current best possible assignment. If record does not exist in the - * assignmentMetadataStore, return the current state assignment. + * assignmentMetadataStore, return the current state assignment. * @throws HelixRebalanceException */ private Map<String, ResourceAssignment> getBestPossibleAssignment( @@ -456,7 +512,12 @@ public class WagedRebalancer { Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap(); if (assignmentMetadataStore != null) { try { + LatencyMetric stateReadLatency = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(), + LatencyMetric.class); + stateReadLatency.startMeasuringLatency(); currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); + stateReadLatency.endMeasuringLatency(); } catch (HelixException ex) { // Report error. and use empty mapping instead. LOG.error("Failed to get the current best possible assignment.", ex); @@ -483,8 +544,8 @@ public class WagedRebalancer { if (!currentStateMap.isEmpty()) { ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName); currentStateMap.entrySet().stream().forEach(currentStateEntry -> { - newResourceAssignment - .addReplicaMap(currentStateEntry.getKey(), currentStateEntry.getValue()); + newResourceAssignment.addReplicaMap(currentStateEntry.getKey(), + currentStateEntry.getValue()); }); currentStateAssignment.put(resourceName, newResourceAssignment); } @@ -507,11 +568,10 @@ public class WagedRebalancer { Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes); offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances()); for (String resource : resourceSet) { - DelayedRebalanceUtil - .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances, - clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), - clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(), - clusterConfig, _manager); + DelayedRebalanceUtil.setRebalanceScheduler(resource, delayedRebalanceEnabled, + offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(), + clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), + clusterConfig.getRebalanceDelayTime(), clusterConfig, _manager); } } else { LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified."); @@ -523,30 +583,30 @@ public class WagedRebalancer { * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states * might include inactive nodes. * This overwrite will adjust the final mapping, so as to ensure the result is completely valid. - * @param idealStateMap the calculated ideal states. - * @param clusterData the cluster data cache. - * @param resourceMap the rebalanaced resource map. - * @param clusterChanges the detected cluster changes that triggeres the rebalance. + * @param idealStateMap the calculated ideal states. + * @param clusterData the cluster data cache. + * @param resourceMap the rebalanaced resource map. + * @param clusterChanges the detected cluster changes that triggeres the rebalance. * @param resourceStatePriorityMap the state priority map for each resource. - * @param baseline the baseline assignment + * @param baseline the baseline assignment */ private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap, ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap, Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Map<String, Integer>> resourceStatePriorityMap, - Map<String, ResourceAssignment> baseline) - throws HelixRebalanceException { + Map<String, ResourceAssignment> baseline) throws HelixRebalanceException { Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances(); - // Note that the calculation used the baseline as the input only. This is for minimizing unnecessary partition movement. - Map<String, ResourceAssignment> activeAssignment = - calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances, - Collections.emptyMap(), baseline); + // Note that the calculation used the baseline as the input only. This is for minimizing + // unnecessary partition movement. + Map<String, ResourceAssignment> activeAssignment = calculateAssignment(clusterData, + clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline); for (String resourceName : idealStateMap.keySet()) { IdealState is = idealStateMap.get(resourceName); if (!activeAssignment.containsKey(resourceName)) { throw new HelixRebalanceException( "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for " - + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE); + + resourceName, + HelixRebalanceException.Type.FAILED_TO_CALCULATE); } IdealState currentIdealState = clusterData.getIdealState(resourceName); IdealState newActiveIdealState = @@ -555,9 +615,9 @@ public class WagedRebalancer { int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size()); int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia); - Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil - .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(), - enabledLiveInstances, Math.min(minActiveReplica, numReplia)); + Map<String, List<String>> finalPreferenceLists = + DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), + is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia)); is.setPreferenceLists(finalPreferenceLists); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java index f70de9a..fbf8b19 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java @@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory { // enlarge the overall weight of the evenness constraints compared with the movement constraint. // TODO: Tune or make the following factor configurable. private static final int EVENNESS_PREFERENCE_NORMALIZE_FACTOR = 50; - private static final Map<String, Float> MODEL = new HashMap<>() { + private static final Map<String, Float> MODEL = new HashMap<String, Float>() { { // The default setting put(PartitionMovementConstraint.class.getSimpleName(), 1f); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 8c082f1..6f442ea 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -26,8 +26,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; - import java.util.stream.Collectors; + +import javax.management.JMException; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; @@ -42,6 +43,8 @@ import org.apache.helix.controller.rebalancer.Rebalancer; import org.apache.helix.controller.rebalancer.SemiAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.monitoring.metrics.MetricCollector; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -64,6 +67,8 @@ import org.slf4j.LoggerFactory; public class BestPossibleStateCalcStage extends AbstractBaseStage { private static final Logger logger = LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName()); + // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread only. + private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>(); @Override public void process(ClusterEvent event) throws Exception { @@ -253,20 +258,41 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { Map<String, IdealState> newIdealStates = new HashMap<>(); // Init rebalancer with the rebalance preferences. - Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig() - .getGlobalRebalancePreference(); + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = + cache.getClusterConfig().getGlobalRebalancePreference(); + + if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) { + try { + // If HelixManager is null, we just pass in null for MetricCollector so that a + // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's + // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases - + // in this case, WagedRebalancer will not read/write to metadata store and just use + // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for + // verifying whether the cluster has converged. + METRIC_COLLECTOR_THREAD_LOCAL.set(helixManager == null ? null + : new WagedRebalancerMetricCollector(helixManager.getClusterName())); + } catch (JMException e) { + LogUtil.logWarn(logger, _eventId, String.format( + "MetricCollector instantiation failed! WagedRebalancer will not emit metrics due to JMException %s", + e)); + } + } + // TODO avoid creating the rebalancer on every rebalance call for performance enhancement - WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences); + WagedRebalancer wagedRebalancer = + new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get()); try { - newIdealStates.putAll(wagedRebalancer - .computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput)); + newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap, + currentStateOutput)); } catch (HelixRebalanceException ex) { // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result. // Since it calculates for all the eligible resources globally, a partial result is invalid. // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring. - LogUtil.logError(logger, _eventId, String - .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s", - wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); + LogUtil.logError(logger, _eventId, + String.format( + "Failed to calculate the new Ideal States using the rebalancer %s due to %s", + wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), + ex); } finally { wagedRebalancer.close(); } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java index 73bf057..fee9099 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java @@ -28,5 +28,6 @@ public enum MonitorDomainNames { HelixThreadPoolExecutor, HelixCallback, RoutingTableProvider, - CLMParticipantReport + CLMParticipantReport, + Rebalancer } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java new file mode 100644 index 0000000..764557a --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java @@ -0,0 +1,100 @@ +package org.apache.helix.monitoring.metrics; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import javax.management.JMException; +import javax.management.ObjectName; +import org.apache.helix.HelixException; +import org.apache.helix.monitoring.metrics.model.Metric; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; + +/** + * Collects and manages all metrics that implement the {@link Metric} interface. + */ +public abstract class MetricCollector extends DynamicMBeanProvider { + private static final String CLUSTER_NAME_KEY = "ClusterName"; + private static final String ENTITY_NAME_KEY = "EntityName"; + private final String _monitorDomainName; + private final String _clusterName; + private final String _entityName; + private Map<String, Metric> _metricMap; + + public MetricCollector(String monitorDomainName, String clusterName, String entityName) { + _monitorDomainName = monitorDomainName; + _clusterName = clusterName; + _entityName = entityName; + _metricMap = new HashMap<>(); + } + + @Override + public DynamicMBeanProvider register() throws JMException { + // First cast all Metric objects to DynamicMetrics + Collection<DynamicMetric<?, ?>> dynamicMetrics = new HashSet<>(); + _metricMap.values().forEach(metric -> dynamicMetrics.add(metric.getDynamicMetric())); + + // Define MBeanName and ObjectName + // MBean name has two key-value pairs: + // ------ 1) ClusterName KV pair (first %s=%s) + // ------ 2) EntityName KV pair (second %s=%s) + String mbeanName = + String.format("%s=%s, %s=%s", CLUSTER_NAME_KEY, _clusterName, ENTITY_NAME_KEY, _entityName); + + // ObjectName has one key-value pair: + // ------ 1) Monitor domain name KV pair where value is the MBean name + doRegister(dynamicMetrics, + new ObjectName(String.format("%s:%s", _monitorDomainName, mbeanName))); + return this; + } + + @Override + public String getSensorName() { + return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName, + _entityName); + } + + void addMetric(Metric metric) { + if (metric instanceof DynamicMetric) { + _metricMap.putIfAbsent(metric.getMetricName(), metric); + } else { + throw new HelixException("MetricCollector only supports Metrics that are DynamicMetric!"); + } + } + + /** + * Returns a desired type of the metric. + * @param metricName + * @param metricClass Desired type + * @param <T> Casted result of the metric + * @return + */ + public <T extends DynamicMetric> T getMetric(String metricName, Class<T> metricClass) { + return metricClass.cast(_metricMap.get(metricName)); + } + + public Map<String, Metric> getMetricMap() { + return _metricMap; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java new file mode 100644 index 0000000..04d804d --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java @@ -0,0 +1,80 @@ +package org.apache.helix.monitoring.metrics; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.management.JMException; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge; +import org.apache.helix.monitoring.metrics.model.LatencyMetric; + +public class WagedRebalancerMetricCollector extends MetricCollector { + private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer"; + + /** + * This enum class contains all metric names defined for WagedRebalancer. Note that all enums are + * in camel case for readability. + */ + public enum WagedRebalancerMetricNames { + // Per-stage latency metrics + GlobalBaselineCalcLatencyGauge, + PartialRebalanceLatencyGauge, + + // The following latency metrics are related to AssignmentMetadataStore + StateReadLatencyGauge, + StateWriteLatencyGauge + } + + public WagedRebalancerMetricCollector(String clusterName) throws JMException { + super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME); + createMetrics(); + register(); + } + + /** + * This constructor will create but will not register metrics. This constructor will be used in + * case of JMException so that the rebalancer could proceed without registering and emitting + * metrics. + */ + public WagedRebalancerMetricCollector() { + super(MonitorDomainNames.Rebalancer.name(), null, null); + createMetrics(); + } + + /** + * Creates and registers all metrics in MetricCollector for WagedRebalancer. + */ + private void createMetrics() { + // Define all metrics + LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge( + WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge( + WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge( + WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge( + WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs()); + + // Add metrics to WagedRebalancerMetricCollector + addMetric(globalBaselineCalcLatencyGauge); + addMetric(partialRebalanceLatencyGauge); + addMetric(stateReadLatencyGauge); + addMetric(stateWriteLatencyGauge); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java new file mode 100644 index 0000000..e96a589 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java @@ -0,0 +1,104 @@ +package org.apache.helix.monitoring.metrics.implementation; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowArrayReservoir; +import java.util.concurrent.TimeUnit; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; +import org.apache.helix.monitoring.metrics.model.LatencyMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RebalanceLatencyGauge extends LatencyMetric { + private static final Logger LOG = LoggerFactory.getLogger(RebalanceLatencyGauge.class); + private static final long VALUE_NOT_SET = -1; + private long _lastEmittedMetricValue = VALUE_NOT_SET; + + /** + * Instantiates a new Histogram dynamic metric. + * @param metricName the metric name + */ + public RebalanceLatencyGauge(String metricName, long slidingTimeWindow) { + super(metricName, new Histogram( + new SlidingTimeWindowArrayReservoir(slidingTimeWindow, TimeUnit.MILLISECONDS))); + _metricName = metricName; + } + + /** + * WARNING: this method is not thread-safe. + * Calling this method multiple times would simply overwrite the previous state. This is because + * the rebalancer could fail at any point, and we want it to recover gracefully by resetting the + * internal state of this metric. + */ + @Override + public void startMeasuringLatency() { + reset(); + _startTime = System.currentTimeMillis(); + } + + /** + * WARNING: this method is not thread-safe. + */ + @Override + public void endMeasuringLatency() { + if (_startTime == VALUE_NOT_SET || _endTime != VALUE_NOT_SET) { + LOG.error( + "Needs to call startMeasuringLatency first! Ignoring and resetting the metric. Metric name: {}", + _metricName); + reset(); + return; + } + _endTime = System.currentTimeMillis(); + _lastEmittedMetricValue = _endTime - _startTime; + updateValue(_lastEmittedMetricValue); + reset(); + } + + @Override + public String getMetricName() { + return _metricName; + } + + @Override + public void reset() { + _startTime = VALUE_NOT_SET; + _endTime = VALUE_NOT_SET; + } + + @Override + public String toString() { + return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue()); + } + + /** + * Returns the most recently emitted metric value at the time of the call. + * @return + */ + @Override + public long getLastEmittedMetricValue() { + return _lastEmittedMetricValue; + } + + @Override + public DynamicMetric getDynamicMetric() { + return this; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java similarity index 55% copy from helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java index 73bf057..5a7f0ca 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java @@ -1,4 +1,4 @@ -package org.apache.helix.monitoring.mbeans; +package org.apache.helix.monitoring.metrics.model; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -19,14 +19,23 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; + /** - * This enum defines all of domain names used with various Helix monitor mbeans. + * Represents a count metric and defines methods to help with calculation. A count metric gives a + * gauge value of a certain property. */ -public enum MonitorDomainNames { - ClusterStatus, - HelixZkClient, - HelixThreadPoolExecutor, - HelixCallback, - RoutingTableProvider, - CLMParticipantReport +public abstract class CountMetric<V> extends SimpleDynamicMetric<V> implements Metric { + protected V _count; + + /** + * Instantiates a new Simple dynamic metric. + * @param metricName the metric name + * @param metricObject the metric object + */ + public CountMetric(String metricName, V metricObject) { + super(metricName, metricObject); + } + + public abstract void setCount(Object count); } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java new file mode 100644 index 0000000..c8ba5ae --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java @@ -0,0 +1,52 @@ +package org.apache.helix.monitoring.metrics.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.codahale.metrics.Histogram; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; + +/** + * Represents a latency metric and defines methods to help with calculation. A latency metric gives + * how long a particular stage in the logic took in milliseconds. + */ +public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric { + protected long _startTime; + protected long _endTime; + protected String _metricName; + + /** + * Instantiates a new Histogram dynamic metric. + * @param metricName the metric name + * @param metricObject the metric object + */ + public LatencyMetric(String metricName, Histogram metricObject) { + super(metricName, metricObject); + } + + /** + * Starts measuring the latency. + */ + public abstract void startMeasuringLatency(); + + /** + * Ends measuring the latency. + */ + public abstract void endMeasuringLatency(); +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java similarity index 55% copy from helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java index 73bf057..ba59b4f 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java @@ -1,4 +1,4 @@ -package org.apache.helix.monitoring.mbeans; +package org.apache.helix.monitoring.metrics.model; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -19,14 +19,36 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; + /** - * This enum defines all of domain names used with various Helix monitor mbeans. + * Defines a generic metric interface. */ -public enum MonitorDomainNames { - ClusterStatus, - HelixZkClient, - HelixThreadPoolExecutor, - HelixCallback, - RoutingTableProvider, - CLMParticipantReport +public interface Metric { + + /** + * Gets the name of the metric. + */ + String getMetricName(); + + /** + * Resets the internal state of this metric. + */ + void reset(); + + /** + * Prints the metric along with its name. + */ + String toString(); + + /** + * Returns the most recently emitted value for the metric at the time of the call. + * @return metric value + */ + long getLastEmittedMetricValue(); + + /** + * Returns the underlying DynamicMetric. + */ + DynamicMetric getDynamicMetric(); } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index 96b6523..df368cb 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -133,8 +133,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { } @Test(dependsOnMethods = "testRebalance") - public void testPartialRebalance() - throws IOException, HelixRebalanceException { + public void testPartialRebalance() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); @@ -184,9 +183,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { String resourceName = csEntry.getKey(); CurrentState cs = csEntry.getValue(); for (Map.Entry<String, String> partitionStateEntry : cs.getPartitionStateMap().entrySet()) { - currentStateOutput - .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()), - instanceName, partitionStateEntry.getValue()); + currentStateOutput.setCurrentState(resourceName, + new Partition(partitionStateEntry.getKey()), instanceName, + partitionStateEntry.getValue()); } } } @@ -220,8 +219,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { } @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT") - public void testNonCompatibleConfiguration() - throws IOException, HelixRebalanceException { + public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); @@ -243,8 +241,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // TODO test with invalid capacity configuration which will fail the cluster model constructing. @Test(dependsOnMethods = "testRebalance") - public void testInvalidClusterStatus() - throws IOException { + public void testInvalidClusterStatus() throws IOException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); @@ -291,7 +288,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { } @Test(dependsOnMethods = "testRebalance") - public void testAlgorithmExepction() throws IOException, HelixRebalanceException { + public void testAlgorithmException() throws IOException, HelixRebalanceException { RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", HelixRebalanceException.Type.FAILED_TO_CALCULATE)); @@ -430,9 +427,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { Assert.assertTrue(newIdealStates.containsKey(resourceName)); IdealState is = newIdealStates.get(resourceName); ResourceAssignment assignment = expectedResult.get(resourceName); - Assert.assertEquals(is.getPartitionSet(), new HashSet<>( - assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName()) - .collect(Collectors.toSet()))); + Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions() + .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet()))); for (String partitionName : is.getPartitionSet()) { Assert.assertEquals(is.getInstanceStateMap(partitionName), assignment.getReplicaMap(new Partition(partitionName))); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java new file mode 100644 index 0000000..dc0c89e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java @@ -0,0 +1,132 @@ +package org.apache.helix.controller.rebalancer.waged; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.management.JMException; +import org.apache.helix.HelixRebalanceException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm; +import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Resource; +import org.apache.helix.monitoring.metrics.MetricCollector; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +public class TestWagedRebalancerMetrics extends AbstractTestClusterModel { + private static final String TEST_STRING = "TEST"; + private MetricCollector _metricCollector; + private Set<String> _instances; + private MockRebalanceAlgorithm _algorithm; + private MockAssignmentMetadataStore _metadataStore; + + @BeforeClass + public void initialize() { + super.initialize(); + _instances = new HashSet<>(); + _instances.add(_testInstanceId); + _algorithm = new MockRebalanceAlgorithm(); + + // Initialize a mock assignment metadata store + _metadataStore = new MockAssignmentMetadataStore(); + } + + @Test + public void testMetricValuePropagation() + throws JMException, HelixRebalanceException, IOException { + _metadataStore.clearMetadataStore(); + _metricCollector = new WagedRebalancerMetricCollector(TEST_STRING); + WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, _metricCollector); + + // Generate the input for the rebalancer. + ResourceControllerDataProvider clusterData = setupClusterDataCache(); + Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + Map<String, IdealState> newIdealStates = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + + // Check that there exists a non-zero value in the metrics + Assert.assertTrue(_metricCollector.getMetricMap().values().stream() + .anyMatch(metric -> metric.getLastEmittedMetricValue() > 0L)); + } + + @Override + protected ResourceControllerDataProvider setupClusterDataCache() throws IOException { + ResourceControllerDataProvider testCache = super.setupClusterDataCache(); + + // Set up mock idealstate + Map<String, IdealState> isMap = new HashMap<>(); + for (String resource : _resourceNames) { + IdealState is = new IdealState(resource); + is.setNumPartitions(_partitionNames.size()); + is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + is.setStateModelDefRef("MasterSlave"); + is.setReplicas("100"); + is.setRebalancerClassName(WagedRebalancer.class.getName()); + _partitionNames.stream() + .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList())); + isMap.put(resource, is); + } + when(testCache.getIdealState(anyString())).thenAnswer( + (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0])); + when(testCache.getIdealStates()).thenReturn(isMap); + + // Set up 2 more instances + for (int i = 1; i < 3; i++) { + String instanceName = _testInstanceId + i; + _instances.add(instanceName); + // 1. Set up the default instance information with capacity configuration. + InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName); + Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap(); + instanceConfigMap.put(instanceName, testInstanceConfig); + when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap); + // 2. Mock the live instance node for the default instance. + LiveInstance testLiveInstance = createMockLiveInstance(instanceName); + Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances(); + liveInstanceMap.put(instanceName, testLiveInstance); + when(testCache.getLiveInstances()).thenReturn(liveInstanceMap); + when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet()); + when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet()); + } + + return testCache; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java index 91db076..7cb1da2 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -27,7 +27,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; @@ -116,11 +115,11 @@ public abstract class AbstractTestClusterModel { // 4. Mock two resources, each with 2 partitions on the default instance. // The instance will have the following partitions assigned // Resource 1: - // partition 1 - MASTER - // partition 2 - SLAVE + // -------------- partition 1 - MASTER + // -------------- partition 2 - SLAVE // Resource 2: - // partition 3 - MASTER - // partition 4 - SLAVE + // -------------- partition 3 - MASTER + // -------------- partition 4 - SLAVE CurrentState testCurrentStateResource1 = Mockito.mock(CurrentState.class); Map<String, String> partitionStateMap1 = new HashMap<>(); partitionStateMap1.put(_partitionNames.get(0), "MASTER"); @@ -179,9 +178,10 @@ public abstract class AbstractTestClusterModel { for (CurrentState cs : currentStatemap.values()) { ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName()); // Construct one AssignableReplica for each partition in the current state. - cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add( - new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig, entry.getKey(), - entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2))); + cs.getPartitionStateMap().entrySet().stream() + .forEach(entry -> assignmentSet + .add(new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig, + entry.getKey(), entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2))); } return assignmentSet; }
