This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ee7dcdac1c0757c80f7b7accac576ab830de4952
Author: Jiajun Wang <[email protected]>
AuthorDate: Tue Oct 22 15:08:02 2019 -0700

    The WAGED rebalancer returns the previously calculated assignment on 
calculation failure (#514)
    
    * The WAGED rebalancer returns the previously calculated assignment on 
calculation failure.
    
    This is to protect the cluster assignment on a rebalancing algorithm 
failure. For example, the cluster is out of capacity. In this case, the 
rebalancer will keep using the previously calculated mapping.
    Also, refine the new metric interface, and add the RebalanceFailureCount 
metric for recording the failures.
    
    Modify the test cases so that DBs from different test cases have a 
different name. This is to avoid previous test records to be returned by the 
rebalancer on calculation error.
---
 .../rebalancer/waged/WagedRebalancer.java          | 167 ++++++++++++---------
 .../mbeans/dynamicMBeans/SimpleDynamicMetric.java  |   2 +-
 .../metrics/WagedRebalancerMetricCollector.java    |  33 ++--
 .../implementation/RebalanceFailureCount.java      |  19 +++
 .../implementation/RebalanceLatencyGauge.java      |  26 +---
 .../monitoring/metrics/model/CountMetric.java      |  40 ++++-
 .../monitoring/metrics/model/LatencyMetric.java    |  17 +++
 .../helix/monitoring/metrics/model/Metric.java     |   5 -
 .../rebalancer/waged/TestWagedRebalancer.java      |  50 ++++--
 .../WagedRebalancer/TestDelayedWagedRebalance.java |   7 +-
 ...tDelayedWagedRebalanceWithDisabledInstance.java |   7 +-
 .../TestDelayedWagedRebalanceWithRackaware.java    |   7 +-
 .../WagedRebalancer/TestWagedRebalance.java        |  48 +++---
 .../TestWagedRebalanceFaultZone.java               |  10 +-
 14 files changed, 274 insertions(+), 164 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index f39d3cb..9a01688 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixConstants;
-import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -51,6 +50,7 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.metrics.MetricCollector;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -174,9 +174,36 @@ public class WagedRebalancer {
     LOG.info("Start computing new ideal states for resources: {}", 
resourceMap.keySet().toString());
     validateInput(clusterData, resourceMap);
 
-    // Calculate the target assignment based on the current cluster status.
-    Map<String, IdealState> newIdealStates =
-        computeBestPossibleStates(clusterData, resourceMap, 
currentStateOutput);
+    Map<String, IdealState> newIdealStates;
+    try {
+      // Calculate the target assignment based on the current cluster status.
+      newIdealStates = computeBestPossibleStates(clusterData, resourceMap, 
currentStateOutput);
+    } catch (HelixRebalanceException ex) {
+      LOG.error("Failed to calculate the new assignments.", ex);
+      // Record the failure in metrics.
+      CountMetric rebalanceFailureCount = _metricCollector.getMetric(
+          
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
+          CountMetric.class);
+      rebalanceFailureCount.increaseCount(1L);
+
+      HelixRebalanceException.Type failureType = ex.getFailureType();
+      if 
(failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || 
failureType
+          .equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) {
+        // If the failure is unknown or because of assignment store access 
failure, throw the
+        // rebalance exception.
+        throw ex;
+      } else { // return the previously calculated assignment.
+        LOG.warn(
+            "Returning the last known-good best possible assignment from 
metadata store due to "
+                + "rebalance failure of type: {}", failureType);
+        // Note that don't return an assignment based on the current state if 
there is no previously
+        // calculated result in this fallback logic.
+        Map<String, ResourceAssignment> assignmentRecord =
+            getBestPossibleAssignment(_assignmentMetadataStore, new 
CurrentStateOutput(),
+                resourceMap.keySet());
+        newIdealStates = convertResourceAssignment(clusterData, 
assignmentRecord);
+      }
+    }
 
     // Construct the new best possible states according to the current state 
and target assignment.
     // Note that the new ideal state might be an intermediate state between 
the current state and
@@ -203,7 +230,7 @@ public class WagedRebalancer {
   }
 
   // Coordinate baseline recalculation and partial rebalance according to the 
cluster changes.
-  private Map<String, IdealState> computeBestPossibleStates(
+  protected Map<String, IdealState> computeBestPossibleStates(
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       final CurrentStateOutput currentStateOutput) throws 
HelixRebalanceException {
     getChangeDetector().updateSnapshots(clusterData);
@@ -243,36 +270,15 @@ public class WagedRebalancer {
     Map<String, ResourceAssignment> newAssignment =
         partialRebalance(clusterData, clusterChanges, resourceMap, 
activeNodes, currentStateOutput);
 
-    // <ResourceName, <State, Priority>>
-    Map<String, Map<String, Integer>> resourceStatePriorityMap = new 
HashMap<>();
-    // Convert the assignments into IdealState for the following state mapping 
calculation.
-    Map<String, IdealState> finalIdealStateMap = new HashMap<>();
-    for (String resourceName : newAssignment.keySet()) {
-      IdealState newIdealState;
-      try {
-        IdealState currentIdealState = clusterData.getIdealState(resourceName);
-        Map<String, Integer> statePriorityMap = clusterData
-            
.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
-        // Keep the priority map for the rebalance overwrite logic later.
-        resourceStatePriorityMap.put(resourceName, statePriorityMap);
-        // Create a new IdealState instance contains the new calculated 
assignment in the preference
-        // list.
-        newIdealState = generateIdealStateWithAssignment(resourceName, 
currentIdealState,
-            newAssignment.get(resourceName), statePriorityMap);
-      } catch (Exception ex) {
-        throw new HelixRebalanceException(
-            "Fail to calculate the new IdealState for resource: " + 
resourceName,
-            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
-      }
-      finalIdealStateMap.put(resourceName, newIdealState);
-    }
+    Map<String, IdealState> finalIdealStateMap =
+        convertResourceAssignment(clusterData, newAssignment);
 
     // The additional rebalance overwrite is required since the calculated 
mapping may contains
     // some delayed rebalanced assignments.
     if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
       applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, 
clusterChanges,
-          resourceStatePriorityMap, 
getBaselineAssignment(_assignmentMetadataStore,
-              currentStateOutput, resourceMap.keySet()));
+          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
+              resourceMap.keySet()));
     }
     // Replace the assignment if user-defined preference list is configured.
     // Note the user-defined list is intentionally applied to the final 
mapping after calculation.
@@ -285,6 +291,40 @@ public class WagedRebalancer {
     return finalIdealStateMap;
   }
 
+  /**
+   * Convert the resource assignment map into an IdealState map.
+   */
+  private Map<String, IdealState> convertResourceAssignment(
+      ResourceControllerDataProvider clusterData, Map<String, 
ResourceAssignment> assignments)
+      throws HelixRebalanceException {
+    // Convert the assignments into IdealState for the following state mapping 
calculation.
+    Map<String, IdealState> finalIdealStateMap = new HashMap<>();
+    for (String resourceName : assignments.keySet()) {
+      try {
+        IdealState currentIdealState = clusterData.getIdealState(resourceName);
+        Map<String, Integer> statePriorityMap =
+            
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
+                .getStatePriorityMap();
+        // Create a new IdealState instance which contains the new calculated 
assignment in the
+        // preference list.
+        IdealState newIdealState = new IdealState(resourceName);
+        // Copy the simple fields
+        
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+        // Sort the preference list according to state priority.
+        newIdealState.setPreferenceLists(
+            getPreferenceLists(assignments.get(resourceName), 
statePriorityMap));
+        // Note the state mapping in the new assignment won't directly 
propagate to the map fields.
+        // The rebalancer will calculate for the final state mapping 
considering the current states.
+        finalIdealStateMap.put(resourceName, newIdealState);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Failed to calculate the new IdealState for resource: " + 
resourceName,
+            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+      }
+    }
+    return finalIdealStateMap;
+  }
+
   // TODO make the Baseline calculation async if complicated algorithm is used 
for the Baseline
   private void refreshBaseline(ResourceControllerDataProvider clusterData,
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, 
Resource> resourceMap,
@@ -414,23 +454,6 @@ public class WagedRebalancer {
     return CHANGE_DETECTOR_THREAD_LOCAL.get();
   }
 
-  // Generate a new IdealState based on the input newAssignment.
-  // The assignment will be propagate to the preference lists.
-  // Note that we will recalculate the states based on the current state, so 
there is no need to
-  // update the mapping fields in the IdealState output.
-  private IdealState generateIdealStateWithAssignment(String resourceName,
-      IdealState currentIdealState, ResourceAssignment newAssignment,
-      Map<String, Integer> statePriorityMap) {
-    IdealState newIdealState = new IdealState(resourceName);
-    // Copy the simple fields
-    
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
-    // Sort the preference list according to state priority.
-    newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, 
statePriorityMap));
-    // Note the state mapping in the new assignment won't be directly 
propagate to the map fields.
-    // The rebalancer will calculate for the final state mapping considering 
the current states.
-    return newIdealState;
-  }
-
   // Generate the preference lists from the state mapping based on state 
priority.
   private Map<String, List<String>> getPreferenceLists(ResourceAssignment 
newAssignment,
       Map<String, Integer> statePriorityMap) {
@@ -488,9 +511,6 @@ public class WagedRebalancer {
         stateReadLatency.startMeasuringLatency();
         currentBaseline = assignmentMetadataStore.getBaseline();
         stateReadLatency.endMeasuringLatency();
-      } catch (HelixException ex) {
-        // Report error. and use empty mapping instead.
-        LOG.error("Failed to get the current baseline assignment.", ex);
       } catch (Exception ex) {
         throw new HelixRebalanceException(
             "Failed to get the current baseline assignment because of 
unexpected error.",
@@ -501,6 +521,7 @@ public class WagedRebalancer {
       LOG.warn("The current baseline assignment record is empty. Use the 
current states instead.");
       currentBaseline = getCurrentStateAssingment(currentStateOutput, 
resources);
     }
+    currentBaseline.keySet().retainAll(resources);
     return currentBaseline;
   }
 
@@ -524,9 +545,6 @@ public class WagedRebalancer {
         stateReadLatency.startMeasuringLatency();
         currentBestAssignment = 
assignmentMetadataStore.getBestPossibleAssignment();
         stateReadLatency.endMeasuringLatency();
-      } catch (HelixException ex) {
-        // Report error. and use empty mapping instead.
-        LOG.error("Failed to get the current best possible assignment.", ex);
       } catch (Exception ex) {
         throw new HelixRebalanceException(
             "Failed to get the current best possible assignment because of 
unexpected error.",
@@ -538,6 +556,7 @@ public class WagedRebalancer {
           "The current best possible assignment record is empty. Use the 
current states instead.");
       currentBestAssignment = getCurrentStateAssingment(currentStateOutput, 
resources);
     }
+    currentBestAssignment.keySet().retainAll(resources);
     return currentBestAssignment;
   }
 
@@ -593,39 +612,39 @@ public class WagedRebalancer {
    * @param clusterData the cluster data cache.
    * @param resourceMap the rebalanaced resource map.
    * @param clusterChanges the detected cluster changes that triggeres the 
rebalance.
-   * @param resourceStatePriorityMap the state priority map for each resource.
    * @param baseline the baseline assignment
    */
   private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
-      Map<String, Map<String, Integer>> resourceStatePriorityMap,
       Map<String, ResourceAssignment> baseline) throws HelixRebalanceException 
{
     Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
     // Note that the calculation used the baseline as the input only. This is 
for minimizing
     // unnecessary partition movement.
-    Map<String, ResourceAssignment> activeAssignment = 
calculateAssignment(clusterData,
-        clusterChanges, resourceMap, enabledLiveInstances, 
Collections.emptyMap(), baseline);
+    Map<String, IdealState> activeIdealStates = 
convertResourceAssignment(clusterData,
+        calculateAssignment(clusterData, clusterChanges, resourceMap, 
enabledLiveInstances,
+            Collections.emptyMap(), baseline));
     for (String resourceName : idealStateMap.keySet()) {
-      IdealState is = idealStateMap.get(resourceName);
-      if (!activeAssignment.containsKey(resourceName)) {
+      // The new calculated ideal state before overwrite
+      IdealState newIdealState = idealStateMap.get(resourceName);
+      if (!activeIdealStates.containsKey(resourceName)) {
         throw new HelixRebalanceException(
             "Failed to calculate the complete partition assignment with all 
active nodes. Cannot find the resource assignment for "
-                + resourceName,
-            HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+                + resourceName, 
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       }
+      // The ideal state that is calculated based on the real alive/enabled 
instances list
+      IdealState newActiveIdealState = activeIdealStates.get(resourceName);
+      // The current ideal state that exists in the IdealState znode
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
-      IdealState newActiveIdealState =
-          generateIdealStateWithAssignment(resourceName, currentIdealState,
-              activeAssignment.get(resourceName), 
resourceStatePriorityMap.get(resourceName));
-
-      int numReplia = 
currentIdealState.getReplicaCount(enabledLiveInstances.size());
-      int minActiveReplica = 
DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia);
-      Map<String, List<String>> finalPreferenceLists =
-          
DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
-              is.getPreferenceLists(), enabledLiveInstances, 
Math.min(minActiveReplica, numReplia));
-
-      is.setPreferenceLists(finalPreferenceLists);
+      int numReplica = 
currentIdealState.getReplicaCount(enabledLiveInstances.size());
+      int minActiveReplica =
+          DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, 
numReplica);
+      Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil
+          .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
+              newIdealState.getPreferenceLists(), enabledLiveInstances,
+              Math.min(minActiveReplica, numReplica));
+
+      newIdealState.setPreferenceLists(finalPreferenceLists);
     }
   }
 
@@ -641,4 +660,8 @@ public class WagedRebalancer {
       }
     }
   }
+
+  protected MetricCollector getMetricCollector() {
+    return _metricCollector;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java
index 1be6a21..2b0f1db 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/SimpleDynamicMetric.java
@@ -25,7 +25,7 @@ package org.apache.helix.monitoring.mbeans.dynamicMBeans;
  * @param <T> the type of the metric value
  */
 public class SimpleDynamicMetric<T> extends DynamicMetric<T, T> {
-  private final String _metricName;
+  protected final String _metricName;
 
   /**
    * Instantiates a new Simple dynamic metric.
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index 04d804d..e9494ff 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -20,8 +20,11 @@ package org.apache.helix.monitoring.metrics;
  */
 
 import javax.management.JMException;
+
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import 
org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount;
 import 
org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
 
 public class WagedRebalancerMetricCollector extends MetricCollector {
@@ -38,7 +41,12 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
 
     // The following latency metrics are related to AssignmentMetadataStore
     StateReadLatencyGauge,
-    StateWriteLatencyGauge
+    StateWriteLatencyGauge,
+
+    // Count of any rebalance compute failure.
+    // Note the rebalancer may still be able to return the last known-good 
assignment on a rebalance
+    // compute failure. And this fallback logic won't impact this counting.
+    RebalanceFailureCounter
   }
 
   public WagedRebalancerMetricCollector(String clusterName) throws JMException 
{
@@ -62,19 +70,26 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
    */
   private void createMetrics() {
     // Define all metrics
-    LatencyMetric globalBaselineCalcLatencyGauge = new RebalanceLatencyGauge(
-        WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(), 
getResetIntervalInMs());
-    LatencyMetric partialRebalanceLatencyGauge = new RebalanceLatencyGauge(
-        WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(), 
getResetIntervalInMs());
-    LatencyMetric stateReadLatencyGauge = new RebalanceLatencyGauge(
-        WagedRebalancerMetricNames.StateReadLatencyGauge.name(), 
getResetIntervalInMs());
-    LatencyMetric stateWriteLatencyGauge = new RebalanceLatencyGauge(
-        WagedRebalancerMetricNames.StateWriteLatencyGauge.name(), 
getResetIntervalInMs());
+    LatencyMetric globalBaselineCalcLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(),
+            getResetIntervalInMs());
+    LatencyMetric partialRebalanceLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
+            getResetIntervalInMs());
+    LatencyMetric stateReadLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
+            getResetIntervalInMs());
+    LatencyMetric stateWriteLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
+            getResetIntervalInMs());
+    CountMetric calcFailureCount =
+        new 
RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name());
 
     // Add metrics to WagedRebalancerMetricCollector
     addMetric(globalBaselineCalcLatencyGauge);
     addMetric(partialRebalanceLatencyGauge);
     addMetric(stateReadLatencyGauge);
     addMetric(stateWriteLatencyGauge);
+    addMetric(calcFailureCount);
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java
new file mode 100644
index 0000000..3764645
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceFailureCount.java
@@ -0,0 +1,19 @@
+package org.apache.helix.monitoring.metrics.implementation;
+
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+
+public class RebalanceFailureCount extends CountMetric {
+  /**
+   * Instantiates a new Simple dynamic metric.
+   *
+   * @param metricName the metric name
+   */
+  public RebalanceFailureCount(String metricName) {
+    super(metricName, 0L);
+  }
+
+  @Override
+  public void increaseCount(long count) {
+    updateValue(getValue() + count);
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
index e96a589..b6e58b4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
@@ -22,7 +22,6 @@ package org.apache.helix.monitoring.metrics.implementation;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
 import java.util.concurrent.TimeUnit;
-import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,22 +71,6 @@ public class RebalanceLatencyGauge extends LatencyMetric {
     reset();
   }
 
-  @Override
-  public String getMetricName() {
-    return _metricName;
-  }
-
-  @Override
-  public void reset() {
-    _startTime = VALUE_NOT_SET;
-    _endTime = VALUE_NOT_SET;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Metric %s's latency is %d", _metricName, 
getLastEmittedMetricValue());
-  }
-
   /**
    * Returns the most recently emitted metric value at the time of the call.
    * @return
@@ -97,8 +80,11 @@ public class RebalanceLatencyGauge extends LatencyMetric {
     return _lastEmittedMetricValue;
   }
 
-  @Override
-  public DynamicMetric getDynamicMetric() {
-    return this;
+  /**
+   * Resets the internal state of this metric.
+   */
+  private void reset() {
+    _startTime = VALUE_NOT_SET;
+    _endTime = VALUE_NOT_SET;
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
index 5a7f0ca..424ac9e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
@@ -19,23 +19,49 @@ package org.apache.helix.monitoring.metrics.model;
  * under the License.
  */
 
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 /**
  * Represents a count metric and defines methods to help with calculation. A 
count metric gives a
  * gauge value of a certain property.
  */
-public abstract class CountMetric<V> extends SimpleDynamicMetric<V> implements 
Metric {
-  protected V _count;
+public abstract class CountMetric extends SimpleDynamicMetric<Long> implements 
Metric {
 
   /**
-   * Instantiates a new Simple dynamic metric.
+   * Instantiates a new count metric.
+   *
    * @param metricName the metric name
-   * @param metricObject the metric object
+   * @param initCount the initial count
    */
-  public CountMetric(String metricName, V metricObject) {
-    super(metricName, metricObject);
+  public CountMetric(String metricName, long initCount) {
+    super(metricName, initCount);
   }
 
-  public abstract void setCount(Object count);
+  /**
+   * Increment the metric by the input count.
+   *
+   * @param count
+   */
+  public abstract void increaseCount(long count);
+
+  @Override
+  public String getMetricName() {
+    return _metricName;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Metric %s's count is %d", getMetricName(), 
getValue());
+  }
+
+  @Override
+  public long getLastEmittedMetricValue() {
+    return getValue();
+  }
+
+  @Override
+  public DynamicMetric getDynamicMetric() {
+    return this;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
index c8ba5ae..d60f245 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
@@ -20,6 +20,7 @@ package org.apache.helix.monitoring.metrics.model;
  */
 
 import com.codahale.metrics.Histogram;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 
 /**
@@ -38,6 +39,7 @@ public abstract class LatencyMetric extends 
HistogramDynamicMetric implements Me
    */
   public LatencyMetric(String metricName, Histogram metricObject) {
     super(metricName, metricObject);
+    _metricName = metricName;
   }
 
   /**
@@ -49,4 +51,19 @@ public abstract class LatencyMetric extends 
HistogramDynamicMetric implements Me
    * Ends measuring the latency.
    */
   public abstract void endMeasuringLatency();
+
+  @Override
+  public String getMetricName() {
+    return _metricName;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Metric %s's latency is %d", getMetricName(), 
getLastEmittedMetricValue());
+  }
+
+  @Override
+  public DynamicMetric getDynamicMetric() {
+    return this;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
index ba59b4f..22378dc 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
@@ -32,11 +32,6 @@ public interface Metric {
   String getMetricName();
 
   /**
-   * Resets the internal state of this metric.
-   */
-  void reset();
-
-  /**
    * Prints the metric along with its name.
    */
   String toString();
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index df368cb..dd0cc8c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -41,6 +42,8 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
@@ -241,7 +244,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
 
   // TODO test with invalid capacity configuration which will fail the cluster 
model constructing.
   @Test(dependsOnMethods = "testRebalance")
-  public void testInvalidClusterStatus() throws IOException {
+  public void testInvalidClusterStatus() throws IOException, 
HelixRebalanceException {
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer =
         new WagedRebalancer(_metadataStore, _algorithm, new 
DelayedAutoRebalancer());
@@ -254,13 +257,19 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
     Map<String, Resource> resourceMap = 
clusterData.getIdealStates().keySet().stream().collect(
         Collectors.toMap(resourceName -> resourceName, resourceName -> new 
Resource(resourceName)));
     try {
-      rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+      rebalancer.computeBestPossibleStates(clusterData, resourceMap, new 
CurrentStateOutput());
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), 
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
       Assert.assertEquals(ex.getMessage(),
           "Failed to generate cluster model. Failure Type: 
INVALID_CLUSTER_STATUS");
     }
+
+    // The rebalance will be done with empty mapping result since there is no 
previously calculated
+    // assignment.
+    Assert.assertTrue(
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput())
+            .isEmpty());
   }
 
   @Test(dependsOnMethods = "testRebalance")
@@ -289,24 +298,45 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
 
   @Test(dependsOnMethods = "testRebalance")
   public void testAlgorithmException() throws IOException, 
HelixRebalanceException {
-    RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
-    when(badAlgorithm.calculate(any())).thenThrow(new 
HelixRebalanceException("Algorithm fails.",
-        HelixRebalanceException.Type.FAILED_TO_CALCULATE));
-
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, badAlgorithm, new 
DelayedAutoRebalancer());
+        new WagedRebalancer(_metadataStore, _algorithm, new 
DelayedAutoRebalancer());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
-    Map<String, Resource> resourceMap = 
clusterData.getIdealStates().keySet().stream().collect(
-        Collectors.toMap(resourceName -> resourceName, resourceName -> new 
Resource(resourceName)));
+    Map<String, Resource> resourceMap = 
clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+    // Rebalance with normal configuration. So the assignment will be 
persisted in the metadata store.
+    Map<String, IdealState> result =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+
+    // Recreate a rebalance with the same metadata store but bad algorithm 
instance.
+    RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
+    when(badAlgorithm.calculate(any())).thenThrow(new 
HelixRebalanceException("Algorithm fails.",
+        HelixRebalanceException.Type.FAILED_TO_CALCULATE));
+    rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, new 
DelayedAutoRebalancer());
+
+    // Calculation will fail
     try {
-      rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+      rebalancer.computeBestPossibleStates(clusterData, resourceMap, new 
CurrentStateOutput());
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), 
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: 
FAILED_TO_CALCULATE");
     }
+    // But if call with the public method computeNewIdealStates(), the 
rebalance will return with
+    // the previous rebalance result.
+    Map<String, IdealState> newResult =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    Assert.assertEquals(newResult, result);
+    // Ensure failure has been recorded
+    Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
+        CountMetric.class).getValue().longValue(), 1l);
   }
 
   @Test(dependsOnMethods = "testRebalance")
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
index 713c095..e49cc19 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.TestHelper;
 import 
org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
@@ -39,7 +40,7 @@ public class TestDelayedWagedRebalance extends 
TestDelayedAutoRebalance {
     Set<String> dbNames = new HashSet<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
-      dbNames.add("Test-DB-" + i++);
+      dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
     }
     return new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
         .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
@@ -47,10 +48,10 @@ public class TestDelayedWagedRebalance extends 
TestDelayedAutoRebalance {
 
   // create test DBs, wait it converged and return externalviews
   protected Map<String, ExternalView> createTestDBs(long delayTime) throws 
InterruptedException {
-    Map<String, ExternalView> externalViews = new HashMap<String, 
ExternalView>();
+    Map<String, ExternalView> externalViews = new HashMap<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _minActiveReplica);
       _testDBs.add(db);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
index bcb2260..3d4bd6a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.TestHelper;
 import 
org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
@@ -40,7 +41,7 @@ public class TestDelayedWagedRebalanceWithDisabledInstance
     Set<String> dbNames = new HashSet<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
-      dbNames.add("Test-DB-" + i++);
+      dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
     }
     return new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
         .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
@@ -48,10 +49,10 @@ public class TestDelayedWagedRebalanceWithDisabledInstance
 
   // create test DBs, wait it converged and return externalviews
   protected Map<String, ExternalView> createTestDBs(long delayTime) throws 
InterruptedException {
-    Map<String, ExternalView> externalViews = new HashMap<String, 
ExternalView>();
+    Map<String, ExternalView> externalViews = new HashMap<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _minActiveReplica);
       _testDBs.add(db);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
index e0adf72..bb7c11a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.TestHelper;
 import 
org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
@@ -39,7 +40,7 @@ public class TestDelayedWagedRebalanceWithRackaware extends 
TestDelayedAutoRebal
     Set<String> dbNames = new HashSet<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
-      dbNames.add("Test-DB-" + i++);
+      dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
     }
     return new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
         .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
@@ -47,10 +48,10 @@ public class TestDelayedWagedRebalanceWithRackaware extends 
TestDelayedAutoRebal
 
   // create test DBs, wait it converged and return externalviews
   protected Map<String, ExternalView> createTestDBs(long delayTime) throws 
InterruptedException {
-    Map<String, ExternalView> externalViews = new HashMap<String, 
ExternalView>();
+    Map<String, ExternalView> externalViews = new HashMap<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _minActiveReplica);
       _testDBs.add(db);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 4920414..9790b92 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -108,7 +108,7 @@ public class TestWagedRebalance extends ZkTestBase {
   public void test() throws Exception {
     int i = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica, _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
@@ -148,7 +148,7 @@ public class TestWagedRebalance extends ZkTestBase {
     Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
     int i = 3;
     for (String tag : tags) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db,
           BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, 
_replica, _replica);
       IdealState is =
@@ -164,7 +164,7 @@ public class TestWagedRebalance extends ZkTestBase {
 
   @Test(dependsOnMethods = "test")
   public void testChangeIdealState() throws InterruptedException {
-    String dbName = "Test-DB";
+    String dbName = "Test-DB-" + TestHelper.getTestMethodName();
     createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
         BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, 
_replica);
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
@@ -198,7 +198,7 @@ public class TestWagedRebalance extends ZkTestBase {
 
   @Test(dependsOnMethods = "test")
   public void testDisableInstance() throws InterruptedException {
-    String dbName = "Test-DB";
+    String dbName = "Test-DB-" + TestHelper.getTestMethodName();
     createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
         BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, 
_replica);
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
@@ -254,7 +254,7 @@ public class TestWagedRebalance extends ZkTestBase {
 
     int j = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + j++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + j++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
@@ -293,7 +293,7 @@ public class TestWagedRebalance extends ZkTestBase {
 
     int j = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + j++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + j++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
@@ -324,7 +324,7 @@ public class TestWagedRebalance extends ZkTestBase {
   public void testMixedRebalancerUsage() throws InterruptedException {
     int i = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
       if (i == 0) {
         _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, 
stateModel,
             IdealState.RebalanceMode.FULL_AUTO + "", 
CrushRebalanceStrategy.class.getName());
@@ -354,12 +354,14 @@ public class TestWagedRebalance extends ZkTestBase {
       String limitedResourceName = null;
       int i = 0;
       for (String stateModel : _testModels) {
-        String db = "Test-DB-" + i++;
+        String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
         createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
             _replica);
         if (i == 1) {
-          // The limited resource has additional limitation, so even the other 
resources can be assigned
-          // later, this resource will still be blocked by the max partition 
limitation.
+          // The limited resource has additional limitation.
+          // The other resources could have been assigned in theory if the 
WAGED rebalancer were
+          // not used.
+          // However, with the WAGED rebalancer, this restricted resource will 
block the other ones.
           limitedResourceName = db;
           IdealState idealState =
               
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
@@ -371,8 +373,9 @@ public class TestWagedRebalance extends ZkTestBase {
       }
       Thread.sleep(300);
 
-      // Since the WAGED rebalancer does not do partial rebalance, the initial 
assignment won't show.
-      Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db 
-> {
+      // Since the WAGED rebalancer need to finish rebalancing every 
resources, the initial
+      // assignment won't show.
+      Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db 
-> {
         ExternalView ev =
             
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
         return ev != null && !ev.getPartitionSet().isEmpty();
@@ -383,20 +386,13 @@ public class TestWagedRebalance extends ZkTestBase {
       configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
       Thread.sleep(300);
 
-      // wait until any of the resources is rebalanced
-      TestHelper.verify(() -> {
-        for (String db : _allDBs) {
-          ExternalView ev =
-              
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
-          if (ev != null && !ev.getPartitionSet().isEmpty()) {
-            return true;
-          }
-        }
-        return false;
-      }, 3000);
-      ExternalView ev = _gSetupTool.getClusterManagementTool()
-          .getResourceExternalView(CLUSTER_NAME, limitedResourceName);
-      Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty());
+      // Since the WAGED rebalancer need to finish rebalancing every 
resources, the assignment won't
+      // show even removed cluster level restriction
+      Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().anyMatch(db 
-> {
+        ExternalView ev =
+            
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+        return ev != null && !ev.getPartitionSet().isEmpty();
+      }), 2000));
 
       // Remove the resource level limitation
       IdealState idealState = _gSetupTool.getClusterManagementTool()
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
index 0a4c232..831f77f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -110,7 +110,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase 
{
   public void testZoneIsolation() throws Exception {
     int i = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-testZoneIsolation" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
@@ -126,7 +126,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase 
{
     Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
     int i = 0;
     for (String tag : tags) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-testZoneIsolationWithInstanceTag" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db,
           BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, 
_replica, _replica);
       IdealState is =
@@ -154,7 +154,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase 
{
 
     int j = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + j++;
+      String db = "Test-DB-testLackEnoughLiveRacks" + j++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
@@ -196,7 +196,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase 
{
 
     int j = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + j++;
+      String db = "Test-DB-testLackEnoughRacks" + j++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
@@ -228,7 +228,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase 
{
   public void testAddZone() throws Exception {
     int i = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + i++;
+      String db = "Test-DB-testAddZone" + i++;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
           _replica);
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);

Reply via email to