Emitting per resource rebalance status for possible calculation failure.

The status in MBean will be string for debug purposes only.
The resource rebalance state attribute will be in one of the following state:
1. NORMAL
2. BEST_POSSIBLE_STATE_CAL_FAILED: calculation failed or no possible allocation 
found.
3. INTERMEDIATE_STATE_CAL_FAILED: Intermediate state calculation failed. (not 
include throttled case)
4. UNKNOWN: the resource is not rebalanced or newly created.

Additional related changes:
1. Fix a cluster level metric related bug to generate the right metrics data.
2. Fix a resource monitoring bug that DISABLE_MONITORING is not working.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2f39f381
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2f39f381
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2f39f381

Branch: refs/heads/master
Commit: 2f39f381b0981503d7c204aabbeaa09153292e15
Parents: b549cda
Author: Jiajun Wang <jjw...@linkedin.com>
Authored: Fri Oct 5 16:26:11 2018 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Thu Nov 1 14:38:36 2018 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |  52 ++++----
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/IntermediateStateCalcStage.java      |  19 +++
 .../monitoring/mbeans/ClusterStatusMonitor.java |  66 ++++++----
 .../monitoring/mbeans/ResourceMonitor.java      |  35 ++++--
 .../dynamicMBeans/DynamicMBeanProvider.java     |  18 +--
 .../TestAlertingRebalancerFailure.java          | 123 +++++++++++++------
 ...ceModeWhenReachingOfflineInstancesLimit.java |  61 ++++++---
 .../mbeans/TestClusterStatusMonitor.java        |  10 +-
 .../mbeans/TestDisableResourceMbean.java        |  17 ++-
 .../monitoring/mbeans/TestResourceMonitor.java  |  21 +++-
 11 files changed, 291 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1bbd6a0..b0e453d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.util.HelixUtil;
@@ -75,11 +76,6 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     // Reset current INIT/RUNNING tasks on participants for throttling
     cache.resetActiveTaskCount(currentStateOutput);
 
-    // Check whether the offline/disabled instance count in the cluster 
reaches the set limit,
-    // if yes, pause the rebalancer.
-    validateOfflineInstancesLimit(cache,
-        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()), 
clusterStatusMonitor);
-
     final BestPossibleStateOutput bestPossibleStateOutput =
         compute(event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
@@ -112,6 +108,13 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     BestPossibleStateOutput output = new BestPossibleStateOutput();
 
     HelixManager helixManager = 
event.getAttribute(AttributeName.helixmanager.name());
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+
+    // Check whether the offline/disabled instance count in the cluster 
reaches the set limit,
+    // if yes, pause the rebalancer.
+    boolean isValid = validateOfflineInstancesLimit(cache,
+        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
 
     final List<String> failureResources = new ArrayList<>();
     Iterator<Resource> itr = resourceMap.values().iterator();
@@ -125,6 +128,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
         LogUtil.logError(logger, _eventId,
             "Exception when calculating best possible states for " + 
resource.getResourceName(),
             ex);
+
       }
       if (!result) {
         failureResources.add(resource.getResourceName());
@@ -134,31 +138,34 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     }
 
     // Check and report if resource rebalance has failure
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, 
clusterStatusMonitor,
+    updateRebalanceStatus(!isValid || !failureResources.isEmpty(), 
failureResources, helixManager,
+        cache, clusterStatusMonitor,
         "Failed to calculate best possible states for " + 
failureResources.size() + " resources.");
 
     return output;
   }
 
-  private void updateRebalanceStatus(final boolean hasFailure, final 
HelixManager helixManager,
-      final ClusterDataCache cache, final ClusterStatusMonitor 
clusterStatusMonitor,
-      final String errorMessage) {
+  private void updateRebalanceStatus(final boolean hasFailure, final 
List<String> failedResources,
+      final HelixManager helixManager, final ClusterDataCache cache,
+      final ClusterStatusMonitor clusterStatusMonitor, final String 
errorMessage) {
     asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
       @Override
       public Object call() {
         try {
-          // TODO re-enable logging error after ticket HELIX-631 is resolved
-          /*
-          if (hasFailure && _statusUpdateUtil != null) {
-            _statusUpdateUtil
-                .logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, 
this.getClass(),
-                    errorMessage, helixManager);
+          if (hasFailure) {
+            /* TODO Enable this update when we resolve ZK server load issue. 
This will cause extra write to ZK.
+            if (_statusUpdateUtil != null) {
+              _statusUpdateUtil
+                  
.logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, this.getClass(),
+                      errorMessage, helixManager);
+            }
+            */
+            LogUtil.logWarn(logger, _eventId, errorMessage);
           }
-          */
           if (clusterStatusMonitor != null) {
             clusterStatusMonitor.setRebalanceFailureGauge(hasFailure);
+            clusterStatusMonitor.setResourceRebalanceStates(failedResources,
+                
ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
           }
         } catch (Exception e) {
           LogUtil.logError(logger, _eventId, "Could not update cluster 
status!", e);
@@ -170,8 +177,8 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
 
   // Check whether the offline/disabled instance count in the cluster reaches 
the set limit,
   // if yes, pause the rebalancer, and throw exception to terminate rebalance 
cycle.
-  private void validateOfflineInstancesLimit(final ClusterDataCache cache,
-      final HelixManager manager, final ClusterStatusMonitor 
clusterStatusMonitor) {
+  private boolean validateOfflineInstancesLimit(final ClusterDataCache cache,
+      final HelixManager manager) {
     int maxOfflineInstancesAllowed = 
cache.getClusterConfig().getMaxOfflineInstancesAllowed();
     if (maxOfflineInstancesAllowed >= 0) {
       int offlineCount = cache.getAllInstances().size() - 
cache.getEnabledLiveInstances().size();
@@ -190,11 +197,10 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
           LogUtil.logError(logger, _eventId, "Failed to put cluster " + 
cache.getClusterName()
               + " into maintenance mode, HelixManager is not set!");
         }
-        if (!cache.isTaskCache()) {
-          updateRebalanceStatus(true, manager, cache, clusterStatusMonitor, 
errMsg);
-        }
+        return false;
       }
     }
+    return true;
   }
 
   private boolean computeResourceBestPossibleState(ClusterEvent event, 
ClusterDataCache cache,

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e3a504b..667b254 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -187,9 +187,7 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
               cache.getStateModelDef(idealState.getStateModelDefRef());
           clusterStatusMonitor
               .setResourceStatus(view, 
cache.getIdealState(view.getResourceName()),
-                  stateModelDef);
-          clusterStatusMonitor
-              .updatePendingMessages(resource.getResourceName(), 
totalPendingMessageCount);
+                  stateModelDef, totalPendingMessageCount);
           monitoringResources.add(resourceName);
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c4d11d6..915a90f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -45,6 +45,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +138,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
     ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    List<String> failedResources = new ArrayList<>();
 
     // Priority is applied in assignment computation because higher priority 
by looping in order of
     // decreasing priority
@@ -170,8 +172,17 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       } catch (HelixException ex) {
         LogUtil.logInfo(logger, _eventId,
             "Failed to calculate intermediate partition states for resource " 
+ resourceName, ex);
+        failedResources.add(resourceName);
       }
     }
+
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.setResourceRebalanceStates(failedResources,
+          ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+      clusterStatusMonitor
+          .setResourceRebalanceStates(output.resourceSet(), 
ResourceMonitor.RebalanceStatus.NORMAL);
+    }
+
     return output;
   }
 
@@ -237,6 +248,14 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
                       + " mode due to an instance being assigned more 
replicas/partitions than "
                       + "the limit.");
             }
+
+            ClusterStatusMonitor clusterStatusMonitor =
+                event.getAttribute(AttributeName.clusterStatusMonitor.name());
+            if (clusterStatusMonitor != null) {
+              
clusterStatusMonitor.setResourceRebalanceStates(Collections.singletonList(resource),
+                  
ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+            }
+
             throw new HelixException(errMsg);
           }
           instancePartitionCounts.put(instance, partitionCount);

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index f870ddc..803bd3c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -19,9 +19,10 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,10 +34,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -61,7 +62,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   static final String RESOURCE_STATUS_KEY = "ResourceStatus";
   public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
   public static final String CLUSTER_DN_KEY = "cluster";
-  static final String RESOURCE_DN_KEY = "resourceName";
+  public static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
   static final String MESSAGE_QUEUE_DN_KEY = "messageQueue";
   static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
@@ -160,6 +161,16 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     this._rebalanceFailure = isFailure;
   }
 
+  public void setResourceRebalanceStates(Collection<String> resources,
+      ResourceMonitor.RebalanceStatus state) {
+    for (String resource : resources) {
+      ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resource);
+      if (resourceMonitor != null) {
+        resourceMonitor.setRebalanceState(state);
+      }
+    }
+  }
+
   @Override
   public long getMaxMessageQueueSizeGauge() {
     long maxQueueSize = 0;
@@ -421,11 +432,19 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   public void retainResourceMonitor(Set<String> resourceNames) {
     Set<String> resourcesToRemove = new HashSet<>();
     synchronized (this) {
+      resourceNames.retainAll(_resourceMbeanMap.keySet());
       resourcesToRemove.addAll(_resourceMbeanMap.keySet());
     }
     resourcesToRemove.removeAll(resourceNames);
 
     try {
+      registerResources(resourceNames);
+    } catch (JMException e) {
+      LOG.error(String.format("Could not register beans for the following 
resources: %s",
+          Joiner.on(',').join(resourceNames)), e);
+    }
+
+    try {
       unregisterResources(resourcesToRemove);
     } catch (MalformedObjectNameException e) {
       LOG.error(String.format("Could not unregister beans for the following 
resources: %s",
@@ -433,12 +452,14 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     }
   }
 
-  public void setResourceStatus(ExternalView externalView, IdealState 
idealState, StateModelDefinition stateModelDef) {
+  public void setResourceStatus(ExternalView externalView, IdealState 
idealState,
+      StateModelDefinition stateModelDef, int messageCount) {
     try {
       ResourceMonitor resourceMonitor = 
getOrCreateResourceMonitor(externalView.getId());
 
       if (resourceMonitor != null) {
-        resourceMonitor.updateResource(externalView, idealState, 
stateModelDef);
+        resourceMonitor.updateResourceState(externalView, idealState, 
stateModelDef);
+        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
       }
     } catch (Exception e) {
       LOG.error("Fail to set resource status, resource: " + 
idealState.getResourceName(), e);
@@ -461,24 +482,12 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      
resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions,
+      
resourceMonitor.updateRebalancerStats(numPendingRecoveryRebalancePartitions,
           numPendingLoadRebalancePartitions, 
numRecoveryRebalanceThrottledPartitions,
           numLoadRebalanceThrottledPartitions);
     }
   }
 
-  public synchronized void updatePendingMessages(String resourceName, int 
messageCount) {
-    try {
-      ResourceMonitor resourceMonitor = 
getOrCreateResourceMonitor(resourceName);
-
-      if (resourceMonitor != null) {
-        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
-      }
-    } catch (Exception e) {
-      LOG.error("Fail to update resource pending messages, resource: " + 
resourceName, e);
-    }
-  }
-
   private ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
     try {
       if (!_resourceMbeanMap.containsKey(resourceName)) {
@@ -487,7 +496,6 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
                 new ResourceMonitor(_clusterName, resourceName, 
getObjectName(beanName));
-            bean.register();
             _resourceMbeanMap.put(resourceName, bean);
           }
         }
@@ -663,6 +671,15 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     _instanceMbeanMap.keySet().removeAll(instances);
   }
 
+  private synchronized void registerResources(Collection<String> resources) 
throws JMException {
+    for (String resourceName : resources) {
+      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
+      if (monitor != null) {
+        monitor.register();
+      }
+    }
+  }
+
   private synchronized void unregisterResources(Collection<String> resources) 
throws MalformedObjectNameException {
     for (String resourceName : resources) {
       ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
@@ -729,6 +746,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     }
   }
 
+  // For test only
   protected ResourceMonitor getResourceMonitor(String resourceName) {
     return _resourceMbeanMap.get(resourceName);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index c3dd242..fb9a779 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -41,6 +41,13 @@ import 
org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 public class ResourceMonitor extends DynamicMBeanProvider {
 
+  public enum RebalanceStatus {
+    UNKNOWN,
+    NORMAL,
+    BEST_POSSIBLE_STATE_CAL_FAILED,
+    INTERMEDIATE_STATE_CAL_FAILED
+  }
+
   // Gauges
   private SimpleDynamicMetric<Long> _numOfPartitions;
   private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView;
@@ -67,6 +74,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge;
   private HistogramDynamicMetric 
_partitionTopStateNonGracefulHandoffDurationGauge;
 
+  private SimpleDynamicMetric<String> _rebalanceState;
+
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
   private final String _resourceName;
@@ -96,6 +105,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
     attributeList.add(_numPendingStateTransitions);
+    attributeList.add(_rebalanceState);
     doRegister(attributeList, _initObjectName);
     return this;
   }
@@ -146,6 +156,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _successTopStateHandoffCounter = new 
SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L);
     _successfulTopStateHandoffDurationCounter =
         new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 
0L);
+
+    _rebalanceState = new SimpleDynamicMetric<>("RebalanceStatus", 
RebalanceStatus.UNKNOWN.name());
   }
 
   @Override
@@ -214,7 +226,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _clusterName + " " + _resourceName;
   }
 
-  public void updateResource(ExternalView externalView, IdealState idealState,
+  public void updateResourceState(ExternalView externalView, IdealState 
idealState,
       StateModelDefinition stateModelDef) {
     if (externalView == null) {
       _logger.warn("External view is null");
@@ -229,7 +241,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       }
     }
 
-    resetGauges();
+    resetResourceStateGauges();
 
     if (idealState == null) {
       _logger.warn("ideal state is null for {}", _resourceName);
@@ -319,20 +331,13 @@ public class ResourceMonitor extends DynamicMBeanProvider 
{
     }
   }
 
-  private void resetGauges() {
+  private void resetResourceStateGauges() {
     _numOfErrorPartitions.updateValue(0L);
     _numNonTopStatePartitions.updateValue(0L);
     _externalViewIdealStateDiff.updateValue(0L);
     _numOfPartitionsInExternalView.updateValue(0L);
-
-    // The following gauges are computed each call to updateResource by way of 
looping so need to be reset.
     _numLessMinActiveReplicaPartitions.updateValue(0L);
     _numLessReplicaPartitions.updateValue(0L);
-    _numPendingRecoveryRebalancePartitions.updateValue(0L);
-    _numPendingLoadRebalancePartitions.updateValue(0L);
-    _numRecoveryRebalanceThrottledPartitions.updateValue(0L);
-    _numLoadRebalanceThrottledPartitions.updateValue(0L);
-    _numPendingStateTransitions.updateValue(0L);
   }
 
   public void updatePendingStateTransitionMessages(int messageCount) {
@@ -367,7 +372,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     }
   }
 
-  public void updateRebalancerStat(long numPendingRecoveryRebalancePartitions,
+  public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
       long numPendingLoadRebalancePartitions, long 
numRecoveryRebalanceThrottledPartitions,
       long numLoadRebalanceThrottledPartitions) {
     
_numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions);
@@ -376,6 +381,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     
_numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
   }
 
+  public void setRebalanceState(RebalanceStatus state) {
+    _rebalanceState.updateValue(state.name());
+  }
+
   public long getExternalViewPartitionGauge() {
     return _numOfPartitionsInExternalView.getValue();
   }
@@ -408,6 +417,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _numPendingStateTransitions.getValue();
   }
 
+  public String getRebalanceState() {
+    return _rebalanceState.getValue();
+  }
+
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= 
System.currentTimeMillis()) {
       _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 988ba9b..fbbb9e6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -51,14 +51,15 @@ public abstract class DynamicMBeanProvider implements 
DynamicMBean, SensorNamePr
    * @param domain         the MBean domain name
    * @param keyValuePairs  the MBean object name components
    */
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
       String description, String domain, String... keyValuePairs) throws 
JMException {
     if (_objectName != null) {
-      throw new HelixException(
-          "Mbean has been registered before. Please create new object for new 
registration.");
+      _logger.warn("Mbean has been registered before. Please create new object 
for new registration.");
+      return false;
     }
     updateAttributtInfos(dynamicMetrics, description);
     _objectName = MBeanRegistrar.register(this, domain, keyValuePairs);
+    return true;
   }
 
   /**
@@ -68,19 +69,20 @@ public abstract class DynamicMBeanProvider implements 
DynamicMBean, SensorNamePr
    * @param description    the MBean description
    * @param objectName     the proposed MBean ObjectName
    */
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
       String description, ObjectName objectName) throws JMException {
     if (_objectName != null) {
-      throw new HelixException(
-          "Mbean has been registered before. Please create new object for new 
registration.");
+      _logger.warn("Mbean has been registered before. Please create new object 
for new registration.");
+      return false;
     }
     updateAttributtInfos(dynamicMetrics, description);
     _objectName = MBeanRegistrar.register(this, objectName);
+    return true;
   }
 
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
       ObjectName objectName) throws JMException {
-    doRegister(dynamicMetrics, null, objectName);
+    return doRegister(dynamicMetrics, null, objectName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
index e732c85..7defef4 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
@@ -19,18 +19,19 @@ package org.apache.helix.integration;
  * under the License.
  */
 
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Set;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -42,6 +43,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -50,6 +52,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static 
org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+import static 
org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.RESOURCE_DN_KEY;
 import static 
org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure;
 
 public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
@@ -91,6 +94,9 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
 
     accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
     errorNodeKey = 
accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+
+    _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
   }
 
   @BeforeMethod
@@ -99,8 +105,8 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     accessor.removeProperty(errorNodeKey);
   }
 
-  @Test (enabled = false)
-  public void testParticipantUnavailable() {
+  @Test
+  public void testParticipantUnavailable() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO.name());
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
@@ -119,6 +125,7 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     // Verify there is no rebalance error logged
     Assert.assertNull(accessor.getProperty(errorNodeKey));
     checkRebalanceFailureGauge(false);
+    
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL,
 testDb);
 
     // kill nodes, so rebalance cannot be done
     for (int i = 0; i < NODE_NR; i++) {
@@ -126,8 +133,10 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     }
 
     // Verify the rebalance error caused by no node available
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, 
testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
@@ -138,10 +147,20 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     }
   }
 
-  @Test (enabled = false)
-  public void testTagSetIncorrect() {
+  @Test (dependsOnMethods = "testParticipantUnavailable")
+  public void testTagSetIncorrect() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO.name());
+    ZkHelixClusterVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(new 
HashSet<>(Collections.singleton(testDb))).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // Verify there is no rebalance error logged
+    Assert.assertNull(accessor.getProperty(errorNodeKey));
+    checkRebalanceFailureGauge(false);
+    
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL,
 testDb);
+
     // set expected instance tag
     IdealState is =
         
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
testDb);
@@ -150,15 +169,17 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
     // Verify there is rebalance error logged
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, 
testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
   }
 
-  @Test (enabled = false)
-  public void testWithDomainId() throws InterruptedException {
+  @Test (dependsOnMethods = "testTagSetIncorrect")
+  public void testWithDomainId() throws Exception {
     int replicas = 2;
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
     // 1. disable all participants except one node, then set domain Id
@@ -192,14 +213,17 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     // Verify there is no rebalance error logged
     Assert.assertNull(accessor.getProperty(errorNodeKey));
     checkRebalanceFailureGauge(false);
+    
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL,
 testDb);
 
     // 2. enable the rest nodes with no domain Id
     for (int i = replicas; i < NODE_NR; i++) {
       setInstanceEnable(_participants[i].getInstanceName(), true, 
configAccessor);
     }
     // Verify there is rebalance error logged
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, 
testDb);
 
     // 3. reset all nodes domain Id to be correct setting
     for (int i = replicas; i < NODE_NR; i++) {
@@ -211,6 +235,7 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
 
     // Verify that rebalance error state is removed
     checkRebalanceFailureGauge(false);
+    
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL,
 testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
@@ -223,6 +248,14 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
         String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), 
clusterBeanName));
   }
 
+  private ObjectName getResourceMbeanName(String clusterName, String 
resourceName)
+      throws MalformedObjectNameException {
+    String resourceBeanName =
+        String.format("%s=%s,%s=%s", CLUSTER_DN_KEY, clusterName, 
RESOURCE_DN_KEY, resourceName);
+    return new ObjectName(
+        String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), 
resourceBeanName));
+  }
+
   private void setDomainId(String instanceName, ConfigAccessor configAccessor) 
{
     String domain = String.format("Rack=%s, Instance=%s", instanceName, 
instanceName);
     InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
@@ -237,30 +270,50 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, 
instanceConfig);
   }
 
-  private void checkRebalanceFailureGauge(boolean expectFailure) {
-    try {
-      Long value = (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), 
"RebalanceFailureGauge");
-      Assert.assertNotNull(value);
-      Assert.assertEquals(value == 1, expectFailure);
-    } catch (Exception e) {
-      Assert.fail("Failed to get attribute!");
-    }
+  private void checkRebalanceFailureGauge(final boolean expectFailure) throws 
Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        try {
+          Long value =
+              (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), 
"RebalanceFailureGauge");
+          return value != null && (value == 1) == expectFailure;
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    }, 5000); Assert.assertTrue(result);
   }
 
-  private HelixProperty pollForError(HelixDataAccessor accessor, PropertyKey 
key) {
-    final int POLL_TIMEOUT = 5000;
-    final int POLL_INTERVAL = 100;
-    HelixProperty property = accessor.getProperty(key);
-    int timeWaited = 0;
-    while (property == null && timeWaited < POLL_TIMEOUT) {
-      try {
-        Thread.sleep(POLL_INTERVAL);
-      } catch (InterruptedException e) {
-        return null;
+  private void checkResourceBestPossibleCalFailureState(
+      final ResourceMonitor.RebalanceStatus expectedState, final String 
resourceName)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        try {
+          String state = (String) _server
+              .getAttribute(getResourceMbeanName(CLUSTER_NAME, resourceName), 
"RebalanceStatus");
+          return state != null && state.equals(expectedState.name());
+        } catch (Exception e) {
+          return false;
+        }
       }
-      timeWaited += POLL_INTERVAL;
-      property = accessor.getProperty(key);
-    }
-    return property;
+    }, 5000);
+    Assert.assertTrue(result);
+  }
+
+  private void pollForError(final HelixDataAccessor accessor, final 
PropertyKey key)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        /* TODO re-enable this check when we start recording rebalance error 
again
+        return accessor.getProperty(key) != null;
+        */
+        return true;
+      }
+    }, 5000);
+    Assert.assertTrue(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
index c89505b..0de2fe3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
@@ -19,16 +19,17 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+import javax.management.*;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -42,10 +43,12 @@ import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+import static 
org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure;
 
 public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     extends ZkTestBase {
@@ -105,12 +108,19 @@ public class 
TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
+  @AfterMethod
+  public void afterMethod() {
+    cleanupRebalanceError();
+  }
+
   @Test
   public void testWithDisabledInstancesLimit() throws Exception {
     MaintenanceSignal maintenanceSignal =
         _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance());
     Assert.assertNull(maintenanceSignal);
 
+    checkForRebalanceError(false);
+
     HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
 
     // disable instance
@@ -133,6 +143,8 @@ public class 
TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertNotNull(maintenanceSignal);
     Assert.assertNotNull(maintenanceSignal.getReason());
 
+    checkForRebalanceError(true);
+
     for (i = 2; i < 2 + _maxOfflineInstancesAllowed + 1; i++) {
       instance = _participants.get(i).getInstanceName();
       admin.enableInstance(CLUSTER_NAME, instance, true);
@@ -146,6 +158,9 @@ public class 
TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     MaintenanceSignal maintenanceSignal =
         _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance());
     Assert.assertNull(maintenanceSignal);
+
+    checkForRebalanceError(false);
+
     int i;
     for (i = 2; i < 2 + _maxOfflineInstancesAllowed; i++) {
       _participants.get(i).syncStop();
@@ -163,19 +178,8 @@ public class 
TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertNotNull(maintenanceSignal);
     Assert.assertNotNull(maintenanceSignal.getReason());
 
-    // TODO re-enable the check after HELIX-631 is fixed
-    /*
-    // Verify there is no rebalance error logged
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
-    PropertyKey errorNodeKey =
-        
accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
-    Assert.assertNotNull(accessor.getProperty(errorNodeKey));
-
-    Long value =
-        (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), 
"RebalanceFailureGauge");
-    Assert.assertNotNull(value);
-    Assert.assertTrue(value.longValue() > 0);
-    */
+    // Verify there is rebalance error logged
+    checkForRebalanceError(true);
   }
 
   @AfterClass
@@ -193,12 +197,33 @@ public class 
TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
   }
 
-  private ObjectName getMbeanName(String clusterName)
+  private void checkForRebalanceError(boolean expectError)
+      throws MalformedObjectNameException, AttributeNotFoundException, 
MBeanException,
+      ReflectionException, InstanceNotFoundException, IOException {
+    /* TODO re-enable this check when we start recording rebalance error again
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
+    PropertyKey errorNodeKey =
+        
accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+    Assert.assertEquals(accessor.getProperty(errorNodeKey) != null, 
expectError);
+    */
+
+    Long value =
+        (Long) _server.getAttribute(getClusterMbeanName(CLUSTER_NAME), 
"RebalanceFailureGauge");
+    Assert.assertEquals(value != null && value.longValue() > 0, expectError);
+  }
+
+  private void cleanupRebalanceError() {
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
+    PropertyKey errorNodeKey =
+        
accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+    accessor.removeProperty(errorNodeKey);
+  }
+
+  private ObjectName getClusterMbeanName(String clusterName)
       throws MalformedObjectNameException {
     String clusterBeanName =
         String.format("%s=%s", CLUSTER_DN_KEY, clusterName);
     return new ObjectName(
         String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), 
clusterBeanName));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 143d325..b2daba6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -205,7 +205,7 @@ public class TestClusterStatusMonitor {
     StateModelDefinition stateModelDef =
         BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
 
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getTotalResourceGauge(), 1);
@@ -238,7 +238,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 
lessMinActiveReplica);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
@@ -266,7 +266,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 
missTopState);
@@ -291,7 +291,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
@@ -313,7 +313,7 @@ public class TestClusterStatusMonitor {
 
     // test pending state transition message report and read
     messageCount = new Random().nextInt(numPartition) + 1;
-    monitor.updatePendingMessages(testDB, messageCount);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 
messageCount);
     Assert.assertEquals(monitor.getPendingStateTransitionGuage(), 
messageCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
index fbbf4b8..0c8ebe7 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -88,9 +88,9 @@ public class TestDisableResourceMbean extends ZkUnitTestBase {
     Assert.assertTrue(clusterVerifier.verifyByPolling());
 
     // Verify the bean was created for TestDB0, but not for TestDB1.
-    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", 
clusterName)));
-    Assert.assertFalse(_mbeanServer.isRegistered(getMbeanName("TestDB1", 
clusterName)));
-    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB2", 
clusterName)));
+    pollForMBeanExistance(getMbeanName("TestDB0", clusterName), true);
+    pollForMBeanExistance(getMbeanName("TestDB1", clusterName), false);
+    pollForMBeanExistance(getMbeanName("TestDB2", clusterName), true);
 
     controller.syncStop();
     for (MockParticipantManager participant : participants) {
@@ -100,6 +100,17 @@ public class TestDisableResourceMbean extends 
ZkUnitTestBase {
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
 
+  private void pollForMBeanExistance(final ObjectName objectName, boolean 
expectation)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return _mbeanServer.isRegistered(objectName);
+      }
+    }, 3000);
+    Assert.assertEquals(result, expectation);
+  }
+
   private ObjectName getMbeanName(String resourceName, String clusterName)
       throws MalformedObjectNameException {
     String clusterBeanName =

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 5310ded..713fd65 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -62,7 +62,7 @@ public class TestResourceMonitor {
     StateModelDefinition stateModelDef =
         BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -88,7 +88,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 
errorCount);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount);
@@ -119,7 +119,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 
lessMinActiveReplica);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -151,7 +151,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 
lessReplica);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -181,7 +181,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 
missTopState);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -196,6 +196,17 @@ public class TestResourceMonitor {
     int messageCount = new Random().nextInt(_partitions) + 1;
     monitor.updatePendingStateTransitionMessages(messageCount);
     Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 
messageCount);
+
+    Assert
+        .assertEquals(monitor.getRebalanceState(), 
ResourceMonitor.RebalanceStatus.UNKNOWN.name());
+    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL);
+    Assert.assertEquals(monitor.getRebalanceState(), 
ResourceMonitor.RebalanceStatus.NORMAL.name());
+    
monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
+    Assert.assertEquals(monitor.getRebalanceState(),
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name());
+    
monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+    Assert.assertEquals(monitor.getRebalanceState(),
+        ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name());
   }
 
   /**

Reply via email to