This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit ee7dcdac1c0757c80f7b7accac576ab830de4952 Author: Jiajun Wang <[email protected]> AuthorDate: Tue Oct 22 15:08:02 2019 -0700 The WAGED rebalancer returns the previously calculated assignment on calculation failure (#514) * The WAGED rebalancer returns the previously calculated assignment on calculation failure. This is to protect the cluster assignment on a rebalancing algorithm failure. For example, the cluster is out of capacity. In this case, the rebalancer will keep using the previously calculated mapping. Also, refine the new metric interface, and add the RebalanceFailureCount metric for recording the failures. Modify the test cases so that DBs from different test cases have a different name. This is to avoid previous test records to be returned by the rebalancer on calculation error. --- .../rebalancer/waged/WagedRebalancer.java | 167 ++++++++++++--------- .../mbeans/dynamicMBeans/SimpleDynamicMetric.java | 2 +- .../metrics/WagedRebalancerMetricCollector.java | 33 ++-- .../implementation/RebalanceFailureCount.java | 19 +++ .../implementation/RebalanceLatencyGauge.java | 26 +--- .../monitoring/metrics/model/CountMetric.java | 40 ++++- .../monitoring/metrics/model/LatencyMetric.java | 17 +++ .../helix/monitoring/metrics/model/Metric.java | 5 - .../rebalancer/waged/TestWagedRebalancer.java | 50 ++++-- .../WagedRebalancer/TestDelayedWagedRebalance.java | 7 +- ...tDelayedWagedRebalanceWithDisabledInstance.java | 7 +- .../TestDelayedWagedRebalanceWithRackaware.java | 7 +- .../WagedRebalancer/TestWagedRebalance.java | 48 +++--- .../TestWagedRebalanceFaultZone.java | 10 +- 14 files changed, 274 insertions(+), 164 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index f39d3cb..9a01688 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -30,7 +30,6 @@ import java.util.stream.Collectors; import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixConstants; -import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -51,6 +50,7 @@ import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.metrics.MetricCollector; import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,9 +174,36 @@ public class WagedRebalancer { LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); validateInput(clusterData, resourceMap); - // Calculate the target assignment based on the current cluster status. - Map<String, IdealState> newIdealStates = - computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + Map<String, IdealState> newIdealStates; + try { + // Calculate the target assignment based on the current cluster status. + newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); + } catch (HelixRebalanceException ex) { + LOG.error("Failed to calculate the new assignments.", ex); + // Record the failure in metrics. + CountMetric rebalanceFailureCount = _metricCollector.getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), + CountMetric.class); + rebalanceFailureCount.increaseCount(1L); + + HelixRebalanceException.Type failureType = ex.getFailureType(); + if (failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || failureType + .equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) { + // If the failure is unknown or because of assignment store access failure, throw the + // rebalance exception. + throw ex; + } else { // return the previously calculated assignment. + LOG.warn( + "Returning the last known-good best possible assignment from metadata store due to " + + "rebalance failure of type: {}", failureType); + // Note that don't return an assignment based on the current state if there is no previously + // calculated result in this fallback logic. + Map<String, ResourceAssignment> assignmentRecord = + getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(), + resourceMap.keySet()); + newIdealStates = convertResourceAssignment(clusterData, assignmentRecord); + } + } // Construct the new best possible states according to the current state and target assignment. // Note that the new ideal state might be an intermediate state between the current state and @@ -203,7 +230,7 @@ public class WagedRebalancer { } // Coordinate baseline recalculation and partial rebalance according to the cluster changes. - private Map<String, IdealState> computeBestPossibleStates( + protected Map<String, IdealState> computeBestPossibleStates( ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { getChangeDetector().updateSnapshots(clusterData); @@ -243,36 +270,15 @@ public class WagedRebalancer { Map<String, ResourceAssignment> newAssignment = partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput); - // <ResourceName, <State, Priority>> - Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>(); - // Convert the assignments into IdealState for the following state mapping calculation. - Map<String, IdealState> finalIdealStateMap = new HashMap<>(); - for (String resourceName : newAssignment.keySet()) { - IdealState newIdealState; - try { - IdealState currentIdealState = clusterData.getIdealState(resourceName); - Map<String, Integer> statePriorityMap = clusterData - .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap(); - // Keep the priority map for the rebalance overwrite logic later. - resourceStatePriorityMap.put(resourceName, statePriorityMap); - // Create a new IdealState instance contains the new calculated assignment in the preference - // list. - newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState, - newAssignment.get(resourceName), statePriorityMap); - } catch (Exception ex) { - throw new HelixRebalanceException( - "Fail to calculate the new IdealState for resource: " + resourceName, - HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); - } - finalIdealStateMap.put(resourceName, newIdealState); - } + Map<String, IdealState> finalIdealStateMap = + convertResourceAssignment(clusterData, newAssignment); // The additional rebalance overwrite is required since the calculated mapping may contains // some delayed rebalanced assignments. if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) { applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges, - resourceStatePriorityMap, getBaselineAssignment(_assignmentMetadataStore, - currentStateOutput, resourceMap.keySet())); + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet())); } // Replace the assignment if user-defined preference list is configured. // Note the user-defined list is intentionally applied to the final mapping after calculation. @@ -285,6 +291,40 @@ public class WagedRebalancer { return finalIdealStateMap; } + /** + * Convert the resource assignment map into an IdealState map. + */ + private Map<String, IdealState> convertResourceAssignment( + ResourceControllerDataProvider clusterData, Map<String, ResourceAssignment> assignments) + throws HelixRebalanceException { + // Convert the assignments into IdealState for the following state mapping calculation. + Map<String, IdealState> finalIdealStateMap = new HashMap<>(); + for (String resourceName : assignments.keySet()) { + try { + IdealState currentIdealState = clusterData.getIdealState(resourceName); + Map<String, Integer> statePriorityMap = + clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()) + .getStatePriorityMap(); + // Create a new IdealState instance which contains the new calculated assignment in the + // preference list. + IdealState newIdealState = new IdealState(resourceName); + // Copy the simple fields + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + // Sort the preference list according to state priority. + newIdealState.setPreferenceLists( + getPreferenceLists(assignments.get(resourceName), statePriorityMap)); + // Note the state mapping in the new assignment won't directly propagate to the map fields. + // The rebalancer will calculate for the final state mapping considering the current states. + finalIdealStateMap.put(resourceName, newIdealState); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to calculate the new IdealState for resource: " + resourceName, + HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex); + } + } + return finalIdealStateMap; + } + // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline private void refreshBaseline(ResourceControllerDataProvider clusterData, Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap, @@ -414,23 +454,6 @@ public class WagedRebalancer { return CHANGE_DETECTOR_THREAD_LOCAL.get(); } - // Generate a new IdealState based on the input newAssignment. - // The assignment will be propagate to the preference lists. - // Note that we will recalculate the states based on the current state, so there is no need to - // update the mapping fields in the IdealState output. - private IdealState generateIdealStateWithAssignment(String resourceName, - IdealState currentIdealState, ResourceAssignment newAssignment, - Map<String, Integer> statePriorityMap) { - IdealState newIdealState = new IdealState(resourceName); - // Copy the simple fields - newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); - // Sort the preference list according to state priority. - newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap)); - // Note the state mapping in the new assignment won't be directly propagate to the map fields. - // The rebalancer will calculate for the final state mapping considering the current states. - return newIdealState; - } - // Generate the preference lists from the state mapping based on state priority. private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment, Map<String, Integer> statePriorityMap) { @@ -488,9 +511,6 @@ public class WagedRebalancer { stateReadLatency.startMeasuringLatency(); currentBaseline = assignmentMetadataStore.getBaseline(); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current baseline assignment.", ex); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current baseline assignment because of unexpected error.", @@ -501,6 +521,7 @@ public class WagedRebalancer { LOG.warn("The current baseline assignment record is empty. Use the current states instead."); currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); } + currentBaseline.keySet().retainAll(resources); return currentBaseline; } @@ -524,9 +545,6 @@ public class WagedRebalancer { stateReadLatency.startMeasuringLatency(); currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); stateReadLatency.endMeasuringLatency(); - } catch (HelixException ex) { - // Report error. and use empty mapping instead. - LOG.error("Failed to get the current best possible assignment.", ex); } catch (Exception ex) { throw new HelixRebalanceException( "Failed to get the current best possible assignment because of unexpected error.", @@ -538,6 +556,7 @@ public class WagedRebalancer { "The current best possible assignment record is empty. Use the current states instead."); currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); } + currentBestAssignment.keySet().retainAll(resources); return currentBestAssignment; } @@ -593,39 +612,39 @@ public class WagedRebalancer { * @param clusterData the cluster data cache. * @param resourceMap the rebalanaced resource map. * @param clusterChanges the detected cluster changes that triggeres the rebalance. - * @param resourceStatePriorityMap the state priority map for each resource. * @param baseline the baseline assignment */ private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap, ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap, Map<HelixConstants.ChangeType, Set<String>> clusterChanges, - Map<String, Map<String, Integer>> resourceStatePriorityMap, Map<String, ResourceAssignment> baseline) throws HelixRebalanceException { Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances(); // Note that the calculation used the baseline as the input only. This is for minimizing // unnecessary partition movement. - Map<String, ResourceAssignment> activeAssignment = calculateAssignment(clusterData, - clusterChanges, resourceMap, enabledLiveInstances, Collections.emptyMap(), baseline); + Map<String, IdealState> activeIdealStates = convertResourceAssignment(clusterData, + calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances, + Collections.emptyMap(), baseline)); for (String resourceName : idealStateMap.keySet()) { - IdealState is = idealStateMap.get(resourceName); - if (!activeAssignment.containsKey(resourceName)) { + // The new calculated ideal state before overwrite + IdealState newIdealState = idealStateMap.get(resourceName); + if (!activeIdealStates.containsKey(resourceName)) { throw new HelixRebalanceException( "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for " - + resourceName, - HelixRebalanceException.Type.FAILED_TO_CALCULATE); + + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE); } + // The ideal state that is calculated based on the real alive/enabled instances list + IdealState newActiveIdealState = activeIdealStates.get(resourceName); + // The current ideal state that exists in the IdealState znode IdealState currentIdealState = clusterData.getIdealState(resourceName); - IdealState newActiveIdealState = - generateIdealStateWithAssignment(resourceName, currentIdealState, - activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName)); - - int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size()); - int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia); - Map<String, List<String>> finalPreferenceLists = - DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), - is.getPreferenceLists(), enabledLiveInstances, Math.min(minActiveReplica, numReplia)); - - is.setPreferenceLists(finalPreferenceLists); + int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); + int minActiveReplica = + DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica); + Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil + .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), + newIdealState.getPreferenceLists(), enabledLiveInstances, + Math.min(minActiveReplica, numReplica)); + + newIdealState.setPreferenceLists(finalPreferenceLists); } } @@ -641,4 +660,8 @@ public class WagedRebalancer { } } } + + protected MetricCollector getMetricCollector() { + return _metricCollector; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java index 1be6a21..2b0f1db 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java @@ -25,7 +25,7 @@ package org.apache.helix.monitoring.mbeans.dynamicMBeans; * @param <T> the type of the metric value */ public class SimpleDynamicMetric<T> extends DynamicMetric<T, T> { - private final String _metricName; + protected final String _metricName; /** * Instantiates a new Simple dynamic metric. diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java index 04d804d..e9494ff 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java @@ -20,8 +20,11 @@ package org.apache.helix.monitoring.metrics; */ import javax.management.JMException; + import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount; import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; public class WagedRebalancerMetricCollector extends MetricCollector { @@ -38,7 +41,12 @@ public class WagedRebalancerMetricCollector extends MetricCollector { // The following latency metrics are related to AssignmentMetadataStore StateReadLatencyGauge, - StateWriteLatencyGauge + StateWriteLatencyGauge, + + // Count of any rebalance compute failure. + // Note the rebalancer may still be able to return the last known-good assignment on a rebalance + // compute failure. And this fallback logic won't impact this counting. + RebalanceFailureCounter } public WagedRebalancerMetricCollector(String clusterName) throws JMException { @@ -62,19 +70,26 @@ public class WagedRebalancerMetricCollector extends MetricCollector { */ private void createMetrics() { // Define all metrics - LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateReadLatencyGauge.name(), getResetIntervalInMs()); - LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge( - WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), getResetIntervalInMs()); + LatencyMetric globalBaselineCalcLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric partialRebalanceLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric stateReadLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(), + getResetIntervalInMs()); + LatencyMetric stateWriteLatencyGauge = + new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), + getResetIntervalInMs()); + CountMetric calcFailureCount = + new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name()); // Add metrics to WagedRebalancerMetricCollector addMetric(globalBaselineCalcLatencyGauge); addMetric(partialRebalanceLatencyGauge); addMetric(stateReadLatencyGauge); addMetric(stateWriteLatencyGauge); + addMetric(calcFailureCount); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java new file mode 100644 index 0000000..3764645 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java @@ -0,0 +1,19 @@ +package org.apache.helix.monitoring.metrics.implementation; + +import org.apache.helix.monitoring.metrics.model.CountMetric; + +public class RebalanceFailureCount extends CountMetric { + /** + * Instantiates a new Simple dynamic metric. + * + * @param metricName the metric name + */ + public RebalanceFailureCount(String metricName) { + super(metricName, 0L); + } + + @Override + public void increaseCount(long count) { + updateValue(getValue() + count); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java index e96a589..b6e58b4 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java @@ -22,7 +22,6 @@ package org.apache.helix.monitoring.metrics.implementation; import com.codahale.metrics.Histogram; import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import java.util.concurrent.TimeUnit; -import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.metrics.model.LatencyMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,22 +71,6 @@ public class RebalanceLatencyGauge extends LatencyMetric { reset(); } - @Override - public String getMetricName() { - return _metricName; - } - - @Override - public void reset() { - _startTime = VALUE_NOT_SET; - _endTime = VALUE_NOT_SET; - } - - @Override - public String toString() { - return String.format("Metric %s's latency is %d", _metricName, getLastEmittedMetricValue()); - } - /** * Returns the most recently emitted metric value at the time of the call. * @return @@ -97,8 +80,11 @@ public class RebalanceLatencyGauge extends LatencyMetric { return _lastEmittedMetricValue; } - @Override - public DynamicMetric getDynamicMetric() { - return this; + /** + * Resets the internal state of this metric. + */ + private void reset() { + _startTime = VALUE_NOT_SET; + _endTime = VALUE_NOT_SET; } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java index 5a7f0ca..424ac9e 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java @@ -19,23 +19,49 @@ package org.apache.helix.monitoring.metrics.model; * under the License. */ +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; /** * Represents a count metric and defines methods to help with calculation. A count metric gives a * gauge value of a certain property. */ -public abstract class CountMetric<V> extends SimpleDynamicMetric<V> implements Metric { - protected V _count; +public abstract class CountMetric extends SimpleDynamicMetric<Long> implements Metric { /** - * Instantiates a new Simple dynamic metric. + * Instantiates a new count metric. + * * @param metricName the metric name - * @param metricObject the metric object + * @param initCount the initial count */ - public CountMetric(String metricName, V metricObject) { - super(metricName, metricObject); + public CountMetric(String metricName, long initCount) { + super(metricName, initCount); } - public abstract void setCount(Object count); + /** + * Increment the metric by the input count. + * + * @param count + */ + public abstract void increaseCount(long count); + + @Override + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's count is %d", getMetricName(), getValue()); + } + + @Override + public long getLastEmittedMetricValue() { + return getValue(); + } + + @Override + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java index c8ba5ae..d60f245 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java @@ -20,6 +20,7 @@ package org.apache.helix.monitoring.metrics.model; */ import com.codahale.metrics.Histogram; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; /** @@ -38,6 +39,7 @@ public abstract class LatencyMetric extends HistogramDynamicMetric implements Me */ public LatencyMetric(String metricName, Histogram metricObject) { super(metricName, metricObject); + _metricName = metricName; } /** @@ -49,4 +51,19 @@ public abstract class LatencyMetric extends HistogramDynamicMetric implements Me * Ends measuring the latency. */ public abstract void endMeasuringLatency(); + + @Override + public String getMetricName() { + return _metricName; + } + + @Override + public String toString() { + return String.format("Metric %s's latency is %d", getMetricName(), getLastEmittedMetricValue()); + } + + @Override + public DynamicMetric getDynamicMetric() { + return this; + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java index ba59b4f..22378dc 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java @@ -32,11 +32,6 @@ public interface Metric { String getMetricName(); /** - * Resets the internal state of this metric. - */ - void reset(); - - /** * Prints the metric along with its name. */ String toString(); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index df368cb..dd0cc8c 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -41,6 +42,8 @@ import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector; +import org.apache.helix.monitoring.metrics.model.CountMetric; import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -241,7 +244,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { // TODO test with invalid capacity configuration which will fail the cluster model constructing. @Test(dependsOnMethods = "testRebalance") - public void testInvalidClusterStatus() throws IOException { + public void testInvalidClusterStatus() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); @@ -254,13 +257,19 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect( Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS); Assert.assertEquals(ex.getMessage(), "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS"); } + + // The rebalance will be done with empty mapping result since there is no previously calculated + // assignment. + Assert.assertTrue( + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()) + .isEmpty()); } @Test(dependsOnMethods = "testRebalance") @@ -289,24 +298,45 @@ public class TestWagedRebalancer extends AbstractTestClusterModel { @Test(dependsOnMethods = "testRebalance") public void testAlgorithmException() throws IOException, HelixRebalanceException { - RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); - when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", - HelixRebalanceException.Type.FAILED_TO_CALCULATE)); - _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = - new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer()); ResourceControllerDataProvider clusterData = setupClusterDataCache(); - Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect( - Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName))); + Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> { + Resource resource = new Resource(entry.getKey()); + entry.getValue().getPartitionSet().stream() + .forEach(partition -> resource.addPartition(partition)); + return resource; + })); + // Rebalance with normal configuration. So the assignment will be persisted in the metadata store. + Map<String, IdealState> result = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + + // Recreate a rebalance with the same metadata store but bad algorithm instance. + RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class); + when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.", + HelixRebalanceException.Type.FAILED_TO_CALCULATE)); + rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer()); + + // Calculation will fail try { - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput()); Assert.fail("Rebalance shall fail."); } catch (HelixRebalanceException ex) { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE); Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE"); } + // But if call with the public method computeNewIdealStates(), the rebalance will return with + // the previous rebalance result. + Map<String, IdealState> newResult = + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); + Assert.assertEquals(newResult, result); + // Ensure failure has been recorded + Assert.assertEquals(rebalancer.getMetricCollector().getMetric( + WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(), + CountMetric.class).getValue().longValue(), 1l); } @Test(dependsOnMethods = "testRebalance") diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java index 713c095..e49cc19 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance { Set<String> dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance { // create test DBs, wait it converged and return externalviews protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { - Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + Map<String, ExternalView> externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java index bcb2260..3d4bd6a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -40,7 +41,7 @@ public class TestDelayedWagedRebalanceWithDisabledInstance Set<String> dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -48,10 +49,10 @@ public class TestDelayedWagedRebalanceWithDisabledInstance // create test DBs, wait it converged and return externalviews protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { - Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + Map<String, ExternalView> externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java index e0adf72..bb7c11a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.helix.TestHelper; import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware; import org.apache.helix.model.ExternalView; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; @@ -39,7 +40,7 @@ public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebal Set<String> dbNames = new HashSet<>(); int i = 0; for (String stateModel : TestStateModels) { - dbNames.add("Test-DB-" + i++); + dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++); } return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames) .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build(); @@ -47,10 +48,10 @@ public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebal // create test DBs, wait it converged and return externalviews protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { - Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + Map<String, ExternalView> externalViews = new HashMap<>(); int i = 0; for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _minActiveReplica); _testDBs.add(db); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 4920414..9790b92 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -108,7 +108,7 @@ public class TestWagedRebalance extends ZkTestBase { public void test() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); @@ -148,7 +148,7 @@ public class TestWagedRebalance extends ZkTestBase { Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); int i = 3; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -164,7 +164,7 @@ public class TestWagedRebalance extends ZkTestBase { @Test(dependsOnMethods = "test") public void testChangeIdealState() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -198,7 +198,7 @@ public class TestWagedRebalance extends ZkTestBase { @Test(dependsOnMethods = "test") public void testDisableInstance() throws InterruptedException { - String dbName = "Test-DB"; + String dbName = "Test-DB-" + TestHelper.getTestMethodName(); createResourceWithWagedRebalance(CLUSTER_NAME, dbName, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); @@ -254,7 +254,7 @@ public class TestWagedRebalance extends ZkTestBase { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -293,7 +293,7 @@ public class TestWagedRebalance extends ZkTestBase { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -324,7 +324,7 @@ public class TestWagedRebalance extends ZkTestBase { public void testMixedRebalancerUsage() throws InterruptedException { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; if (i == 0) { _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); @@ -354,12 +354,14 @@ public class TestWagedRebalance extends ZkTestBase { String limitedResourceName = null; int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-" + TestHelper.getTestMethodName() + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); if (i == 1) { - // The limited resource has additional limitation, so even the other resources can be assigned - // later, this resource will still be blocked by the max partition limitation. + // The limited resource has additional limitation. + // The other resources could have been assigned in theory if the WAGED rebalancer were + // not used. + // However, with the WAGED rebalancer, this restricted resource will block the other ones. limitedResourceName = db; IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); @@ -371,8 +373,9 @@ public class TestWagedRebalance extends ZkTestBase { } Thread.sleep(300); - // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. - Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + // Since the WAGED rebalancer need to finish rebalancing every resources, the initial + // assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); return ev != null && !ev.getPartitionSet().isEmpty(); @@ -383,20 +386,13 @@ public class TestWagedRebalance extends ZkTestBase { configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); Thread.sleep(300); - // wait until any of the resources is rebalanced - TestHelper.verify(() -> { - for (String db : _allDBs) { - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - if (ev != null && !ev.getPartitionSet().isEmpty()) { - return true; - } - } - return false; - }, 3000); - ExternalView ev = _gSetupTool.getClusterManagementTool() - .getResourceExternalView(CLUSTER_NAME, limitedResourceName); - Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty()); + // Since the WAGED rebalancer need to finish rebalancing every resources, the assignment won't + // show even removed cluster level restriction + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); // Remove the resource level limitation IdealState idealState = _gSetupTool.getClusterManagementTool() diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java index 0a4c232..831f77f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -110,7 +110,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { public void testZoneIsolation() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolation" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -126,7 +126,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); int i = 0; for (String tag : tags) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testZoneIsolationWithInstanceTag" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica); IdealState is = @@ -154,7 +154,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughLiveRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -196,7 +196,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + j++; + String db = "Test-DB-testLackEnoughRacks" + j++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -228,7 +228,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase { public void testAddZone() throws Exception { int i = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + i++; + String db = "Test-DB-testAddZone" + i++; createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica); _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
