Refactor monitor framework to simplify interfaces.

Target is minimize children classes' responsibility and support major methods 
in the DynamicMBeanProvider as much as we can.
Migrate ClusterEventMonitor and ResourceMonitor to new framework.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b5f46388
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b5f46388
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b5f46388

Branch: refs/heads/master
Commit: b5f46388dac099685f36a82ff3ddc9a312c89a92
Parents: d6f4943
Author: Jiajun Wang <jjw...@linkedin.com>
Authored: Wed Aug 30 13:34:04 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:07:46 2017 -0800

----------------------------------------------------------------------
 .../monitoring/ParticipantStatusMonitor.java    |  24 +-
 .../monitoring/mbeans/ClusterEventMonitor.java  |  67 ++---
 .../mbeans/ClusterEventMonitorMBean.java        |  28 ---
 .../monitoring/mbeans/ClusterStatusMonitor.java | 161 +++++-------
 .../monitoring/mbeans/HelixCallbackMonitor.java |  51 +---
 .../helix/monitoring/mbeans/MBeanRegistrar.java |  52 ++--
 .../mbeans/MessageLatencyMonitor.java           |  59 ++---
 .../mbeans/MessageLatencyMonitorMBean.java      |  43 ----
 .../monitoring/mbeans/ResourceMonitor.java      | 247 ++++++++++---------
 .../monitoring/mbeans/ZkClientPathMonitor.java  |  82 ++----
 .../dynamicMBeans/DynamicMBeanProvider.java     | 109 ++++++--
 .../mbeans/TestHelixCallbackMonitor.java        |  15 +-
 .../monitoring/mbeans/TestResourceMonitor.java  |   7 +-
 .../monitoring/mbeans/TestZkClientMonitor.java  |  19 +-
 14 files changed, 434 insertions(+), 530 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
index 6192c6d..ee15aec 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
@@ -19,23 +19,19 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
+import org.apache.helix.model.Message;
+import org.apache.helix.monitoring.mbeans.*;
+import org.apache.log4j.Logger;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.helix.model.Message;
-import org.apache.helix.monitoring.mbeans.MessageLatencyMonitor;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
-import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
-import org.apache.helix.monitoring.mbeans.StateTransitionStatMonitor;
-import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
-import org.apache.log4j.Logger;
 
 public class ParticipantStatusMonitor {
   private final ConcurrentHashMap<StateTransitionContext, 
StateTransitionStatMonitor> _monitorMap =
@@ -54,10 +50,10 @@ public class ParticipantStatusMonitor {
       _beanServer = ManagementFactory.getPlatformMBeanServer();
       if (isParticipant) {
         _messageMonitor = new ParticipantMessageMonitor(instanceName);
-        _messageLatencyMonitor = new MessageLatencyMonitor(instanceName);
+        _messageLatencyMonitor =
+            new 
MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), 
instanceName);
         _executorMonitors = new ConcurrentHashMap<>();
         register(_messageMonitor, 
getObjectName(_messageMonitor.getParticipantBeanName()));
-        
_messageLatencyMonitor.register(MonitorDomainNames.CLMParticipantReport.name());
       }
     } catch (Exception e) {
       LOG.warn(e);

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
index 63559e4..b6853cb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
@@ -19,8 +19,18 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-public class ClusterEventMonitor implements ClusterEventMonitorMBean {
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterEventMonitor extends DynamicMBeanProvider {
   public enum PhaseName {
     Callback,
     InQueue,
@@ -32,13 +42,17 @@ public class ClusterEventMonitor implements 
ClusterEventMonitorMBean {
   private static final String EVENT_DN_KEY = "eventName";
   private static final String PHASE_DN_KEY = "phaseName";
 
-  private String _phaseName;
-  private long _totalDuration;
-  private long _maxDuration;
-  private long _count;
+  private final String _phaseName;
 
-  private long _lastResetTime;
+  private SimpleDynamicMetric<Long> _totalDuration =
+      new SimpleDynamicMetric("TotalDurationCounter", 0l);
+  private SimpleDynamicMetric<Long> _maxDuration =
+      new SimpleDynamicMetric("MaxSingleDurationGauge", 0l);
+  private SimpleDynamicMetric<Long> _count = new 
SimpleDynamicMetric("EventCounter", 0l);
+  private HistogramDynamicMetric _duration = new 
HistogramDynamicMetric("DurationGauge",
+      _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"DurationGauge"));
 
+  private long _lastResetTime;
   private ClusterStatusMonitor _clusterStatusMonitor;
 
   public ClusterEventMonitor(ClusterStatusMonitor clusterStatusMonitor, String 
phaseName) {
@@ -46,28 +60,13 @@ public class ClusterEventMonitor implements 
ClusterEventMonitorMBean {
     _clusterStatusMonitor = clusterStatusMonitor;
   }
 
-  @Override
-  public long getTotalDurationCounter() {
-    return _totalDuration;
-  }
-
-  @Override
-  public long getMaxSingleDurationGauge() {
-    return _maxDuration;
-  }
-
-  @Override
-  public long getEventCounter() {
-    return _count;
-  }
-
   public void reportDuration(long duration) {
-    _totalDuration += duration;
-    _count++;
-
+    _totalDuration.updateValue(_totalDuration.getValue() + duration);
+    _count.updateValue(_count.getValue() + 1);
+    _duration.updateValue(duration);
     if (_lastResetTime + RESET_INTERVAL <= System.currentTimeMillis() ||
-        duration > _maxDuration) {
-      _maxDuration = duration;
+        duration > _maxDuration.getValue()) {
+      _maxDuration.updateValue(duration);
       _lastResetTime = System.currentTimeMillis();
     }
   }
@@ -78,13 +77,17 @@ public class ClusterEventMonitor implements 
ClusterEventMonitorMBean {
         ClusterStatusMonitor.DEFAULT_TAG, _phaseName);
   }
 
-  /**
-   * get clusterEvent bean name
-   *
-   * @return clusterEvent bean name
-   */
-  public String getBeanName() {
+  private String getBeanName() {
     return String.format("%s,%s=%s,%s=%s", 
_clusterStatusMonitor.clusterBeanName(), EVENT_DN_KEY,
         "ClusterEvent", PHASE_DN_KEY, _phaseName);
   }
+
+  public void register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_totalDuration);
+    attributeList.add(_maxDuration);
+    attributeList.add(_count);
+    attributeList.add(_duration);
+    register(attributeList, 
_clusterStatusMonitor.getObjectName(getBeanName()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitorMBean.java
deleted file mode 100644
index 647bafe..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitorMBean.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-public interface ClusterEventMonitorMBean extends SensorNameProvider {
-  long getTotalDurationCounter();
-  long getMaxSingleDurationGauge();
-  long getEventCounter();
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index e91f503..536865b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -19,6 +19,8 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
@@ -29,10 +31,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -48,9 +50,6 @@ import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = 
Logger.getLogger(ClusterStatusMonitor.class);
@@ -79,15 +78,12 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   private Map<String, List<String>> _oldDisabledPartitions = 
Collections.emptyMap();
   private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
 
-  private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
-      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = 
new ConcurrentHashMap<>();
 
-  private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
-      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = 
new ConcurrentHashMap<>();
 
   // phaseName -> eventMonitor
-  private final ConcurrentHashMap<String, ClusterEventMonitor> 
_clusterEventMbeanMap =
-      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ClusterEventMonitor> 
_clusterEventMbeanMap = new ConcurrentHashMap<>();
 
   /**
    * PerInstanceResource bean map: beanName->bean
@@ -95,11 +91,9 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   private final Map<PerInstanceResourceMonitor.BeanName, 
PerInstanceResourceMonitor> _perInstanceResourceMap =
       new ConcurrentHashMap<>();
 
-  private final Map<String, WorkflowMonitor> _perTypeWorkflowMonitorMap =
-      new ConcurrentHashMap<>();
+  private final Map<String, WorkflowMonitor> _perTypeWorkflowMonitorMap = new 
ConcurrentHashMap<>();
 
-  private final Map<String, JobMonitor> _perTypeJobMonitorMap =
-      new ConcurrentHashMap<>();
+  private final Map<String, JobMonitor> _perTypeJobMonitorMap = new 
ConcurrentHashMap<>();
 
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
@@ -140,7 +134,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     return _disabledInstances.size();
   }
 
-  @Override public long getDisabledPartitionsGauge() {
+  @Override
+  public long getDisabledPartitionsGauge() {
     int numDisabled = 0;
     for (Map<String, List<String>> perInstance : _disabledPartitions.values()) 
{
       for (List<String> partitions : perInstance.values()) {
@@ -216,9 +211,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param disabledPartitions a map of instance name to the set of partitions 
disabled on it
    * @param tags a map of instance name to the set of tags on it
    */
-  public synchronized void setClusterInstanceStatus(Set<String> 
liveInstanceSet,
-      Set<String> instanceSet, Set<String> disabledInstanceSet,
-      Map<String, Map<String, List<String>>> disabledPartitions,
+  public synchronized void setClusterInstanceStatus(Set<String> 
liveInstanceSet, Set<String> instanceSet,
+      Set<String> disabledInstanceSet, Map<String, Map<String, List<String>>> 
disabledPartitions,
       Map<String, List<String>> oldDisabledPartitions, Map<String, 
Set<String>> tags) {
     // Unregister beans for instances that are no longer configured
     Set<String> toUnregister = Sets.newHashSet(_instanceMbeanMap.keySet());
@@ -287,15 +281,14 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     if (!_clusterEventMbeanMap.containsKey(phase)) {
       ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase);
       try {
-        String beanName = monitor.getBeanName();
-        register(monitor, getObjectName(beanName));
         ClusterEventMonitor prevEventMbean = _clusterEventMbeanMap.put(phase, 
monitor);
         if (prevEventMbean != null) {
-          unregister(getObjectName(monitor.getBeanName()));
+          prevEventMbean.unregister();
         }
-      } catch (MalformedObjectNameException e) {
-        LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + 
_clusterName
-            + " and phase type: " + phase, e);
+        monitor.register();
+      } catch (JMException e) {
+        LOG.error(
+            "Failed to register ClusterEventMonitorMbean for cluster " + 
_clusterName + " and phase type: " + phase, e);
         return;
       }
     }
@@ -309,8 +302,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param messages a list of messages
    */
   public void increaseMessageReceived(List<Message> messages) {
-    Map<String, Long> messageCountPerInstance = new HashMap<String, Long>();
-    Map<String, Long> messageCountPerResource = new HashMap<String, Long>();
+    Map<String, Long> messageCountPerInstance = new HashMap<>();
+    Map<String, Long> messageCountPerResource = new HashMap<>();
 
     // Aggregate messages
     for (Message message : messages) {
@@ -357,7 +350,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
 
     // Convert to perInstanceResource beanName->partition->state
     Map<PerInstanceResourceMonitor.BeanName, Map<Partition, String>> beanMap =
-        new HashMap<PerInstanceResourceMonitor.BeanName, Map<Partition, 
String>>();
+        new HashMap<>();
     Set<String> resourceSet = new HashSet<>(bestPossibleStates.resourceSet());
     for (String resource : resourceSet) {
       Map<Partition, Map<String, String>> partitionStateMap =
@@ -366,8 +359,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
         Map<String, String> instanceStateMap = 
partitionStateMap.get(partition);
         for (String instance : instanceStateMap.keySet()) {
           String state = instanceStateMap.get(instance);
-          PerInstanceResourceMonitor.BeanName beanName =
-              new PerInstanceResourceMonitor.BeanName(instance, resource);
+          PerInstanceResourceMonitor.BeanName beanName = new 
PerInstanceResourceMonitor.BeanName(instance, resource);
           if (!beanMap.containsKey(beanName)) {
             beanMap.put(beanName, new HashMap<Partition, String>());
           }
@@ -377,8 +369,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     }
     synchronized (_perInstanceResourceMap) {
       // Unregister beans for per-instance resources that no longer exist
-      Set<PerInstanceResourceMonitor.BeanName> toUnregister =
-          Sets.newHashSet(_perInstanceResourceMap.keySet());
+      Set<PerInstanceResourceMonitor.BeanName> toUnregister = 
Sets.newHashSet(_perInstanceResourceMap.keySet());
       toUnregister.removeAll(beanMap.keySet());
       try {
         unregisterPerInstanceResources(toUnregister);
@@ -391,12 +382,10 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
       Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
       for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
         PerInstanceResourceMonitor bean =
-            new PerInstanceResourceMonitor(_clusterName, 
beanName.instanceName(),
-                beanName.resourceName());
+            new PerInstanceResourceMonitor(_clusterName, 
beanName.instanceName(), beanName.resourceName());
         String stateModelDefName = 
resourceMap.get(beanName.resourceName()).getStateModelDefRef();
         InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
-        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
-            stateModelDefMap.get(stateModelDefName));
+        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()), 
stateModelDefMap.get(stateModelDefName));
         monitorsToRegister.add(bean);
       }
       try {
@@ -409,8 +398,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
         PerInstanceResourceMonitor bean = 
_perInstanceResourceMap.get(beanName);
         String stateModelDefName = 
resourceMap.get(beanName.resourceName()).getStateModelDefRef();
         InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
-        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
-            stateModelDefMap.get(stateModelDefName));
+        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()), 
stateModelDefMap.get(stateModelDefName));
       }
     }
   }
@@ -433,39 +421,23 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     }
   }
 
-  public void setResourceStatus(ExternalView externalView, IdealState 
idealState,
-      StateModelDefinition stateModelDef) {
+  public void setResourceStatus(ExternalView externalView, IdealState 
idealState, StateModelDefinition stateModelDef) {
     try {
-      String resourceName = externalView.getId();
-      if (!_resourceMbeanMap.containsKey(resourceName)) {
-        synchronized (this) {
-          if (!_resourceMbeanMap.containsKey(resourceName)) {
-            ResourceMonitor bean = new ResourceMonitor(_clusterName, 
resourceName);
-            bean.updateResource(externalView, idealState, stateModelDef);
-            registerResources(Arrays.asList(bean));
-          }
-        }
-      }
-      ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
-      String oldSensorName = bean.getSensorName();
-      bean.updateResource(externalView, idealState, stateModelDef);
-      String newSensorName = bean.getSensorName();
-      if (!oldSensorName.equals(newSensorName)) {
-        unregisterResources(Arrays.asList(resourceName));
-        registerResources(Arrays.asList(bean));
+      ResourceMonitor resourceMonitor = 
getOrCreateResourceMonitor(externalView.getId());
+
+      if (resourceMonitor != null) {
+        resourceMonitor.updateResource(externalView, idealState, 
stateModelDef);
       }
     } catch (Exception e) {
       LOG.error("Fail to set resource status, resource: " + 
idealState.getResourceName(), e);
     }
   }
 
-  public synchronized void updateMissingTopStateDurationStats(String 
resourceName, long duration,
-      boolean succeeded) {
+  public synchronized void updateMissingTopStateDurationStats(String 
resourceName, long duration, boolean succeeded) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      resourceMonitor
-          .updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, 
duration, succeeded);
+      
resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, 
duration, succeeded);
     }
   }
 
@@ -475,9 +447,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      
resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions,
-          numPendingLoadRebalancePartitions, 
numRecoveryRebalanceThrottledPartitions,
-          numLoadRebalanceThrottledPartitions);
+      
resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions, 
numPendingLoadRebalancePartitions,
+          numRecoveryRebalanceThrottledPartitions, 
numLoadRebalanceThrottledPartitions);
     }
   }
 
@@ -486,12 +457,13 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
       if (!_resourceMbeanMap.containsKey(resourceName)) {
         synchronized (this) {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
-            ResourceMonitor bean = new ResourceMonitor(_clusterName, 
resourceName);
-            registerResources(Arrays.asList(bean));
+            String beanName = getResourceBeanName(resourceName);
+            ResourceMonitor bean = new ResourceMonitor(_clusterName, 
resourceName, getObjectName(beanName));
+            _resourceMbeanMap.put(resourceName, bean);
           }
         }
       }
-    } catch (MalformedObjectNameException ex) {
+    } catch (JMException ex) {
       LOG.error("Fail to register resource mbean, resource: " + resourceName);
     }
 
@@ -540,8 +512,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
         continue;
       }
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
-      TaskState currentState =
-          workflowContext == null ? TaskState.NOT_STARTED : 
workflowContext.getWorkflowState();
+      TaskState currentState = workflowContext == null ? TaskState.NOT_STARTED 
: workflowContext.getWorkflowState();
       updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
     }
   }
@@ -587,10 +558,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
       Set<String> allJobs = workflowConfig.getJobDag().getAllNodes();
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
       for (String job : allJobs) {
-        TaskState currentState =
-            workflowContext == null ? TaskState.NOT_STARTED : 
workflowContext.getJobState(job);
-        updateJobGauges(
-            workflowConfig.getJobTypes() == null ? null : 
workflowConfig.getJobTypes().get(job),
+        TaskState currentState = workflowContext == null ? 
TaskState.NOT_STARTED : workflowContext.getJobState(job);
+        updateJobGauges(workflowConfig.getJobTypes() == null ? null : 
workflowConfig.getJobTypes().get(job),
             currentState);
       }
     }
@@ -636,8 +605,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     }
   }
 
-  private synchronized void unregisterInstances(Collection<String> instances)
-      throws MalformedObjectNameException {
+  private synchronized void unregisterInstances(Collection<String> instances) 
throws MalformedObjectNameException {
     for (String instanceName : instances) {
       String beanName = getInstanceBeanName(instanceName);
       unregister(getObjectName(beanName));
@@ -645,21 +613,12 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     _instanceMbeanMap.keySet().removeAll(instances);
   }
 
-  private synchronized void registerResources(Collection<ResourceMonitor> 
resources)
-      throws MalformedObjectNameException {
-    for (ResourceMonitor monitor : resources) {
-      String resourceName = monitor.getResourceName();
-      String beanName = getResourceBeanName(resourceName);
-      register(monitor, getObjectName(beanName));
-      _resourceMbeanMap.put(resourceName, monitor);
-    }
-  }
-
-  private synchronized void unregisterResources(Collection<String> resources)
-      throws MalformedObjectNameException {
+  private synchronized void unregisterResources(Collection<String> resources) 
throws MalformedObjectNameException {
     for (String resourceName : resources) {
-      String beanName = getResourceBeanName(resourceName);
-      unregister(getObjectName(beanName));
+      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
+      if (monitor != null) {
+        monitor.unregister();
+      }
     }
     _resourceMbeanMap.keySet().removeAll(resources);
   }
@@ -667,36 +626,31 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   private synchronized void 
unregisterEventMonitors(Collection<ClusterEventMonitor> monitors)
       throws MalformedObjectNameException {
     for (ClusterEventMonitor monitor : monitors) {
-      String beanName = monitor.getBeanName();
-      unregister(getObjectName(beanName));
+      monitor.unregister();
     }
     _resourceMbeanMap.keySet().removeAll(monitors);
   }
 
-  private synchronized void registerPerInstanceResources(
-      Collection<PerInstanceResourceMonitor> monitors) throws 
MalformedObjectNameException {
+  private synchronized void 
registerPerInstanceResources(Collection<PerInstanceResourceMonitor> monitors)
+      throws MalformedObjectNameException {
     for (PerInstanceResourceMonitor monitor : monitors) {
       String instanceName = monitor.getInstanceName();
       String resourceName = monitor.getResourceName();
       String beanName = getPerInstanceResourceBeanName(instanceName, 
resourceName);
       register(monitor, getObjectName(beanName));
-      _perInstanceResourceMap.put(
-          new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), 
monitor);
+      _perInstanceResourceMap.put(new 
PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
     }
   }
 
-  private synchronized void unregisterPerInstanceResources(
-      Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+  private synchronized void 
unregisterPerInstanceResources(Collection<PerInstanceResourceMonitor.BeanName> 
beanNames)
       throws MalformedObjectNameException {
     for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
-      unregister(getObjectName(
-          getPerInstanceResourceBeanName(beanName.instanceName(), 
beanName.resourceName())));
+      
unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
 beanName.resourceName())));
     }
     _perInstanceResourceMap.keySet().removeAll(beanNames);
   }
 
-  private synchronized void registerWorkflow(WorkflowMonitor workflowMonitor)
-      throws MalformedObjectNameException {
+  private synchronized void registerWorkflow(WorkflowMonitor workflowMonitor) 
throws MalformedObjectNameException {
     String workflowBeanName = 
getWorkflowBeanName(workflowMonitor.getWorkflowType());
     register(workflowMonitor, getObjectName(workflowBeanName));
   }
@@ -715,8 +669,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     register(jobMonitor, getObjectName(jobBeanName));
   }
 
-  private synchronized void unregisterJobs(Collection<String> jobMonitors)
-      throws MalformedObjectNameException {
+  private synchronized void unregisterJobs(Collection<String> jobMonitors) 
throws MalformedObjectNameException {
     for (String jobMonitor : jobMonitors) {
       String jobBeanName = getJobBeanName(jobMonitor);
       unregister(getObjectName(jobBeanName));
@@ -758,8 +711,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @return per-instance resource bean name
    */
   public String getPerInstanceResourceBeanName(String instanceName, String 
resourceName) {
-    return String.format("%s,%s", clusterBeanName(), new 
PerInstanceResourceMonitor.BeanName(
-        instanceName, resourceName).toString());
+    return String.format("%s,%s", clusterBeanName(),
+        new PerInstanceResourceMonitor.BeanName(instanceName, 
resourceName).toString());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
index 45c4259..d23c405 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
@@ -19,9 +19,6 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.InstanceType;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
@@ -30,22 +27,16 @@ import 
org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 import javax.management.JMException;
-import javax.management.ObjectName;
 import java.util.ArrayList;
 import java.util.List;
 
-public class HelixCallbackMonitor {
+public class HelixCallbackMonitor extends DynamicMBeanProvider {
   public static final String MONITOR_TYPE = "Type";
   public static final String MONITOR_KEY = "Key";
   public static final String MONITOR_CHANGE_TYPE = "Change";
-  private static final String MBEAN_DESCRIPTION = "Helix Callback Monitor";
-
-  private static final MetricRegistry _metricRegistry = new MetricRegistry();
 
-  private DynamicMBeanProvider _dynamicMBeanProvider;
-  private ObjectName _objectName;
-  private final InstanceType _instanceType;
-  private final String _key;
+  private static final String MBEAN_DESCRIPTION = "Helix Callback Monitor";
+  private final String _sensorName;
   private final HelixConstants.ChangeType _changeType;
 
   private SimpleDynamicMetric<Long> _counter = new 
SimpleDynamicMetric("Counter", 0l);
@@ -55,32 +46,27 @@ public class HelixCallbackMonitor {
       new SimpleDynamicMetric("LatencyCounter", 0l);
 
   private HistogramDynamicMetric _latencyGauge = new 
HistogramDynamicMetric("LatencyGauge",
-      _metricRegistry.histogram(toString() + "LatencyGauge"));
+      _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"LatencyGauge"));
 
   public HelixCallbackMonitor(InstanceType type, String key, 
HelixConstants.ChangeType changeType)
       throws JMException {
-    _instanceType = type;
-    _key = key;
     _changeType = changeType;
+    _sensorName = String
+        .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), 
type.name(), key,
+            changeType.name());
 
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_counter);
     attributeList.add(_unbatchedCounter);
     attributeList.add(_totalLatencyCounter);
     attributeList.add(_latencyGauge);
-
-    _dynamicMBeanProvider = new DynamicMBeanProvider(String
-        .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), 
_instanceType.name(), _key,
-            _changeType.name()), MBEAN_DESCRIPTION, attributeList);
-
-    register(type, key, changeType);
+    register(attributeList, MBEAN_DESCRIPTION, 
MonitorDomainNames.HelixCallback.name(),
+        MONITOR_TYPE, type.name(), MONITOR_KEY, key, MONITOR_CHANGE_TYPE, 
changeType.name());
   }
 
-  private void register(InstanceType type, String key, 
HelixConstants.ChangeType changeType)
-      throws JMException {
-    _objectName = MBeanRegistrar
-        .register(_dynamicMBeanProvider, 
MonitorDomainNames.HelixCallback.name(), MONITOR_TYPE,
-            type.name(), MONITOR_KEY, key, MONITOR_CHANGE_TYPE, 
changeType.name());
+  @Override
+  public String getSensorName() {
+    return _sensorName;
   }
 
   public HelixConstants.ChangeType getChangeType() {
@@ -96,17 +82,4 @@ public class HelixCallbackMonitor {
   public void increaseCallbackUnbatchedCounters() {
     _unbatchedCounter.updateValue(_unbatchedCounter.getValue() + 1);
   }
-
-  /**
-   * After unregistered, the MBean can't be registered again, a new monitor 
has be to created.
-   */
-  public void unregister() {
-    MBeanRegistrar.unregister(_objectName);
-    _metricRegistry.removeMatching(new MetricFilter() {
-      @Override
-      public boolean matches(String name, Metric metric) {
-        return name.startsWith(toString());
-      }
-    });
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
index 6d43575..6856728 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
@@ -19,14 +19,11 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.lang.management.ManagementFactory;
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 import org.apache.log4j.Logger;
 
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+
 public class MBeanRegistrar {
   private static Logger LOG = Logger.getLogger(MBeanRegistrar.class);
 
@@ -36,34 +33,49 @@ public class MBeanRegistrar {
   private static MBeanServer _beanServer = 
ManagementFactory.getPlatformMBeanServer();
 
   /**
-   * This method registers an object with specified domain and properties.
+   * This method registers an object with specified ObjectName.
    * If the same ObjectName is already registered, this method tags a 
"Duplicate" property
    * with an incremental number and tries to register the object again until 
the object is
    * successfully registered.
    */
-  public static ObjectName register(Object object, String domain, String... 
keyValuePairs)
-      throws JMException {
+  public static ObjectName register(Object object, ObjectName objectName) 
throws JMException {
     int num = 0;
     while (num < MAX_NUM_DUPLICATED_MONITORS) {
-      ObjectName newObjectName = buildObjectName(num, domain, keyValuePairs);
+      ObjectName newObjectName;
+      if (num > 0) {
+        newObjectName = new ObjectName(
+            String.format("%s,%s=%s", objectName.toString(), DUPLICATE, 
String.valueOf(num)));
+      } else {
+        newObjectName = objectName;
+      }
       num++;
       try {
         _beanServer.registerMBean(object, newObjectName);
       } catch (InstanceAlreadyExistsException e) {
         continue;
       } catch (JMException e) {
-        LOG.error(String.format("Error in registering: %s",
-            buildObjectName(domain, keyValuePairs).getCanonicalName()), e);
+        LOG.error(String.format("Error in registering: %s", 
objectName.getCanonicalName()), e);
         return null;
       }
       return newObjectName;
     }
-    LOG.error(String.format(
-        "There're already %d %s, no more will be registered.", 
MAX_NUM_DUPLICATED_MONITORS,
-        buildObjectName(domain, keyValuePairs).getCanonicalName()));
+    LOG.error(String
+        .format("There're already %d %s, no more will be registered.", 
MAX_NUM_DUPLICATED_MONITORS,
+            objectName.getCanonicalName()));
     return null;
   }
 
+  /**
+   * This method registers an object with specified domain and properties.
+   * If the same ObjectName is already registered, this method tags a 
"Duplicate" property
+   * with an incremental number and tries to register the object again until 
the object is
+   * successfully registered.
+   */
+  public static ObjectName register(Object object, String domain, String... 
keyValuePairs)
+      throws JMException {
+    return register(object, buildObjectName(domain, keyValuePairs));
+  }
+
   public static void unregister(ObjectName objectName) {
     if (objectName != null && _beanServer.isRegistered(objectName)) {
       try {
@@ -75,13 +87,7 @@ public class MBeanRegistrar {
   }
 
   public static ObjectName buildObjectName(String domain, String... 
keyValuePairs)
-      throws MalformedObjectNameException{
-    return buildObjectName(0, domain, keyValuePairs);
-  }
-
-  public static ObjectName buildObjectName(int num, String domain, String... 
keyValuePairs)
       throws MalformedObjectNameException {
-
     if (keyValuePairs.length < 2 || keyValuePairs.length % 2 != 0) {
       throw new IllegalArgumentException("key-value pairs for ObjectName must 
contain even "
           + "number of String and at least 2 String");
@@ -92,9 +98,7 @@ public class MBeanRegistrar {
       objectNameStr.append(
           String.format(i == 0 ? "%s=%s" : ",%s=%s", keyValuePairs[i], 
keyValuePairs[i + 1]));
     }
-    if (num > 0) {
-      objectNameStr.append(String.format(",%s=%s", DUPLICATE, 
String.valueOf(num)));
-    }
+
     return new ObjectName(String.format("%s:%s", domain, 
objectNameStr.toString()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
index 35fe8b8..76ab42e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
@@ -19,33 +19,22 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.helix.HelixException;
 import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.ParticipantStatusMonitor;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
-import org.apache.log4j.Logger;
 
 import javax.management.JMException;
-import javax.management.ObjectName;
 import java.util.ArrayList;
 import java.util.List;
 
-public class MessageLatencyMonitor {
-  private static final Logger logger = 
Logger.getLogger(MessageLatencyMonitor.class.getName());
-  private static final String MBEAN_DESCRIPTION = "Helix Message Latency 
Monitor";
+public class MessageLatencyMonitor extends DynamicMBeanProvider {
   public static String MONITOR_TYPE_KW = "MonitorType";
 
-  private static final MetricRegistry _metricRegistry = new MetricRegistry();
-  private String _participantName;
-  private ObjectName _objectName;
-
-  private DynamicMBeanProvider _dynamicMBeanProvider;
+  private static final String MBEAN_DESCRIPTION = "Helix Message Latency 
Monitor";
+  private final String _sensorName;
 
   private SimpleDynamicMetric<Long> _totalMessageCount =
       new SimpleDynamicMetric("TotalMessageCount", 0l);
@@ -53,48 +42,32 @@ public class MessageLatencyMonitor {
       new SimpleDynamicMetric("TotalMessageLatency", 0l);
   private HistogramDynamicMetric _messageLatencyGauge =
       new HistogramDynamicMetric("MessageLatencyGauge",
-          _metricRegistry.histogram(toString() + "MessageLatencyGauge"));
+          _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"MessageLatencyGauge"));
+
+  public MessageLatencyMonitor(String domainName, String participantName) 
throws JMException {
+    _sensorName = String
+        .format("%s.%s.%s.%s", 
ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, participantName,
+            MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName());
 
-  public MessageLatencyMonitor(String participantName) {
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_totalMessageCount);
     attributeList.add(_totalMessageLatency);
     attributeList.add(_messageLatencyGauge);
+    register(attributeList, MBEAN_DESCRIPTION, domainName, 
ParticipantStatusMonitor.PARTICIPANT_KEY,
+        participantName, MONITOR_TYPE_KW, 
MessageLatencyMonitor.class.getSimpleName());
+  }
 
-    _participantName = participantName;
-    _dynamicMBeanProvider = new DynamicMBeanProvider(String
-        .format("%s.%s.%s.%s", 
ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, participantName,
-            MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()), 
MBEAN_DESCRIPTION,
-        attributeList);
+  @Override
+  public String getSensorName() {
+    return _sensorName;
   }
 
   public void updateLatency(Message message) {
     long latency = System.currentTimeMillis() - message.getCreateTimeStamp();
-    logger.info(String.format("The latency of message %s is %d ms", 
message.getMsgId(), latency));
+    _logger.info(String.format("The latency of message %s is %d ms", 
message.getMsgId(), latency));
 
     _totalMessageCount.updateValue(_totalMessageCount.getValue() + 1);
     _totalMessageLatency.updateValue(_totalMessageLatency.getValue() + 
latency);
     _messageLatencyGauge.updateValue(latency);
   }
-
-  public void register(String domainName) throws JMException {
-    if (_objectName != null) {
-      throw new HelixException("Monitor has already been registed: " + 
_objectName.toString());
-    }
-    _objectName = MBeanRegistrar
-        .register(_dynamicMBeanProvider, domainName, 
ParticipantStatusMonitor.PARTICIPANT_KEY,
-            _participantName, MONITOR_TYPE_KW, 
MessageLatencyMonitor.class.getSimpleName());
-  }
-
-  public void unregister() {
-    if (_objectName != null) {
-      MBeanRegistrar.unregister(_objectName);
-    }
-    _metricRegistry.removeMatching(new MetricFilter() {
-      @Override
-      public boolean matches(String name, Metric metric) {
-        return name.startsWith(toString());
-      }
-    });
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitorMBean.java
deleted file mode 100644
index b8a50c6..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitorMBean.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-public interface MessageLatencyMonitorMBean extends SensorNameProvider{
-
-  /**
-   * Mesure the total message latency from ZK
-   * @return
-   */
-  public long getTotalMessageLatencyCounter();
-
-  /**
-   * The total count of received NEW message
-   * @return
-   */
-  public long getTotalMessageCounter();
-
-  /**
-   * The maximal message latency for single message. It will be cleared in one 
hour
-   * @return
-   */
-  public long getMaxSingleMessageLatencyGauge();
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 41c73ed..6382359 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,122 +19,156 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.helix.HelixDefinedState;
-import org.apache.helix.model.*;
-import org.apache.log4j.Logger;
-
-
-public class ResourceMonitor implements ResourceMonitorMBean {
-  private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+import javax.management.JMException;
+import javax.management.ObjectName;
+import java.util.*;
+
+public class ResourceMonitor extends DynamicMBeanProvider {
   private static final long RESET_TIME_RANGE = 1000 * 60 * 60; // 1 hour
 
   // Gauges
-  private int _numOfPartitions;
-  private int _numOfPartitionsInExternalView;
-  private int _numOfErrorPartitions;
-  private int _numNonTopStatePartitions;
-  private long _numLessMinActiveReplicaPartitions;
-  private long _numLessReplicaPartitions;
-  private long _numPendingRecoveryRebalancePartitions;
-  private long _numPendingLoadRebalancePartitions;
-  private long _numRecoveryRebalanceThrottledPartitions;
-  private long _numLoadRebalanceThrottledPartitions;
-  private int _externalViewIdealStateDiff;
+  private SimpleDynamicMetric<Integer> _numOfPartitions =
+      new SimpleDynamicMetric("PartitionGauge", 0);
+  private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView =
+      new SimpleDynamicMetric("ExternalViewPartitionGauge", 0);
+  private SimpleDynamicMetric<Integer> _numOfErrorPartitions =
+      new SimpleDynamicMetric("ErrorPartitionGauge", 0);
+  private SimpleDynamicMetric<Integer> _numNonTopStatePartitions =
+      new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0);
+  private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions =
+      new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
+  private SimpleDynamicMetric<Long> _numLessReplicaPartitions =
+      new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
+  private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions =
+      new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
+  private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions =
+      new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
+  private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions =
+      new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l);
+  private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions =
+      new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
+  private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff =
+      new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0);
 
   // Counters
-  private long _successfulTopStateHandoffDurationCounter;
-  private long _successTopStateHandoffCounter;
-  private long _failedTopStateHandoffCounter;
-  private long _maxSinglePartitionTopStateHandoffDuration;
+  private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter =
+      new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l);
+  private SimpleDynamicMetric<Long> _successTopStateHandoffCounter =
+      new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
+  private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter =
+      new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
+  private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration 
=
+      new 
SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
+  private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge =
+      new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", 
_metricRegistry
+          .histogram(getMetricRegistryNamePrefix() + 
"PartitionTopStateHandoffDurationGauge"));
+  private SimpleDynamicMetric<Long> _totalMessageReceived =
+      new SimpleDynamicMetric("TotalMessageReceived", 0l);
 
-  private long _lastResetTime;
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
+  private long _lastResetTime;
   private String _resourceName;
   private String _clusterName;
 
-  private long _totalMessageReceived;
-
   public enum MonitorState {
     TOP_STATE
   }
 
-  public ResourceMonitor(String clusterName, String resourceName) {
+  public ResourceMonitor(String clusterName, String resourceName, ObjectName 
objectName)
+      throws JMException {
     _clusterName = clusterName;
     _resourceName = resourceName;
-    _successfulTopStateHandoffDurationCounter = 0L;
-    _successTopStateHandoffCounter = 0L;
-    _failedTopStateHandoffCounter = 0L;
-    _lastResetTime = System.currentTimeMillis();
-    _totalMessageReceived = 0L;
+
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_numOfPartitions);
+    attributeList.add(_numOfPartitionsInExternalView);
+    attributeList.add(_numOfErrorPartitions);
+    attributeList.add(_numNonTopStatePartitions);
+    attributeList.add(_numLessMinActiveReplicaPartitions);
+    attributeList.add(_numLessReplicaPartitions);
+    attributeList.add(_numPendingRecoveryRebalancePartitions);
+    attributeList.add(_numPendingLoadRebalancePartitions);
+    attributeList.add(_numRecoveryRebalanceThrottledPartitions);
+    attributeList.add(_numLoadRebalanceThrottledPartitions);
+    attributeList.add(_externalViewIdealStateDiff);
+    attributeList.add(_successfulTopStateHandoffDurationCounter);
+    attributeList.add(_successTopStateHandoffCounter);
+    attributeList.add(_failedTopStateHandoffCounter);
+    attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
+    attributeList.add(_partitionTopStateHandoffDurationGauge);
+    attributeList.add(_totalMessageReceived);
+
+    register(attributeList, objectName);
   }
 
   @Override
+  public String getSensorName() {
+    return String
+        .format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, 
_clusterName, _tag,
+            _resourceName);
+  }
+
   public long getPartitionGauge() {
-    return _numOfPartitions;
+    return _numOfPartitions.getValue();
   }
 
-  @Override
   public long getErrorPartitionGauge() {
-    return _numOfErrorPartitions;
+    return _numOfErrorPartitions.getValue();
   }
 
-  @Override
   public long getMissingTopStatePartitionGauge() {
-    return _numNonTopStatePartitions;
+    return _numNonTopStatePartitions.getValue();
   }
 
-  @Override
   public long getDifferenceWithIdealStateGauge() {
-    return _externalViewIdealStateDiff;
+    return _externalViewIdealStateDiff.getValue();
   }
 
-  @Override
   public long getSuccessfulTopStateHandoffDurationCounter() {
-    return _successfulTopStateHandoffDurationCounter;
+    return _successfulTopStateHandoffDurationCounter.getValue();
   }
 
-  @Override
   public long getSucceededTopStateHandoffCounter() {
-    return _successTopStateHandoffCounter;
+    return _successTopStateHandoffCounter.getValue();
   }
 
-  @Override
   public long getMaxSinglePartitionTopStateHandoffDurationGauge() {
-    return _maxSinglePartitionTopStateHandoffDuration;
+    return _maxSinglePartitionTopStateHandoffDuration.getValue();
   }
 
-  @Override
   public long getFailedTopStateHandoffCounter() {
-    return _failedTopStateHandoffCounter;
+    return _failedTopStateHandoffCounter.getValue();
   }
 
-  @Override
   public long getTotalMessageReceived() {
-    return _totalMessageReceived;
+    return _totalMessageReceived.getValue();
   }
 
   public synchronized void increaseMessageCount(long messageReceived) {
-    _totalMessageReceived += messageReceived;
-  }
-
-  @Override
-  public String getSensorName() {
-    return String.format("%s.%s.%s.%s", 
ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
-        _tag, _resourceName);
+    _totalMessageReceived.updateValue(_totalMessageReceived.getValue() + 
messageReceived);
   }
 
   public String getResourceName() {
     return _resourceName;
   }
 
-  public void updateResource(ExternalView externalView, IdealState idealState, 
StateModelDefinition stateModelDef) {
+  public String getBeanName() {
+    return _clusterName + " " + _resourceName;
+  }
+
+  public void updateResource(ExternalView externalView, IdealState idealState,
+      StateModelDefinition stateModelDef) {
     if (externalView == null) {
-      LOG.warn("External view is null");
+      _logger.warn("External view is null");
       return;
     }
 
@@ -148,7 +182,7 @@ public class ResourceMonitor implements 
ResourceMonitorMBean {
 
     resetGauges();
     if (idealState == null) {
-      LOG.warn("ideal state is null for " + _resourceName);
+      _logger.warn("ideal state is null for " + _resourceName);
       return;
     }
 
@@ -161,8 +195,8 @@ public class ResourceMonitor implements 
ResourceMonitorMBean {
 
     Set<String> partitions = idealState.getPartitionSet();
 
-    if (_numOfPartitions == 0) {
-      _numOfPartitions = partitions.size();
+    if (_numOfPartitions.getValue() == 0) {
+      _numOfPartitions.updateValue(partitions.size());
     }
 
     int replica = -1;
@@ -198,23 +232,25 @@ public class ResourceMonitor implements 
ResourceMonitorMBean {
           numOfPartitionWithTopState++;
         }
 
-        Map<String, Integer> stateCount = 
stateModelDef.getStateCountMap(idealRecord.size(), replica);
+        Map<String, Integer> stateCount =
+            stateModelDef.getStateCountMap(idealRecord.size(), replica);
         Set<String> activeStates = stateCount.keySet();
         if (currentState != null && activeStates.contains(currentState)) {
           activeReplicaCount++;
         }
       }
       if (replica > 0 && activeReplicaCount < replica) {
-        _numLessReplicaPartitions ++;
+        
_numLessReplicaPartitions.updateValue(_numLessReplicaPartitions.getValue() + 1);
       }
       if (minActiveReplica >= 0 && activeReplicaCount < minActiveReplica) {
-        _numLessMinActiveReplicaPartitions ++;
+        _numLessMinActiveReplicaPartitions
+            .updateValue(_numLessMinActiveReplicaPartitions.getValue() + 1);
       }
     }
-    _numOfErrorPartitions = numOfErrorPartitions;
-    _externalViewIdealStateDiff = numOfDiff;
-    _numOfPartitionsInExternalView = externalView.getPartitionSet().size();
-    _numNonTopStatePartitions = _numOfPartitions - numOfPartitionWithTopState;
+    _numOfErrorPartitions.updateValue(numOfErrorPartitions);
+    _externalViewIdealStateDiff.updateValue(numOfDiff);
+    
_numOfPartitionsInExternalView.updateValue(externalView.getPartitionSet().size());
+    _numNonTopStatePartitions.updateValue(_numOfPartitions.getValue() - 
numOfPartitionWithTopState);
 
     String tag = idealState.getInstanceGroupTag();
     if (tag == null || tag.equals("") || tag.equals("null")) {
@@ -225,34 +261,36 @@ public class ResourceMonitor implements 
ResourceMonitorMBean {
   }
 
   private void resetGauges() {
-    _numOfErrorPartitions = 0;
-    _numNonTopStatePartitions = 0;
-    _externalViewIdealStateDiff = 0;
-    _numOfPartitionsInExternalView = 0;
-    _numLessMinActiveReplicaPartitions = 0;
-    _numLessReplicaPartitions = 0;
-    _numPendingRecoveryRebalancePartitions = 0;
-    _numPendingLoadRebalancePartitions = 0;
-    _numRecoveryRebalanceThrottledPartitions = 0;
-    _numLoadRebalanceThrottledPartitions = 0;
+    _numOfErrorPartitions.updateValue(0);
+    _numNonTopStatePartitions.updateValue(0);
+    _externalViewIdealStateDiff.updateValue(0);
+    _numOfPartitionsInExternalView.updateValue(0);
+    _numLessMinActiveReplicaPartitions.updateValue(0l);
+    _numLessReplicaPartitions.updateValue(0l);
+    _numPendingRecoveryRebalancePartitions.updateValue(0l);
+    _numPendingLoadRebalancePartitions.updateValue(0l);
+    _numRecoveryRebalanceThrottledPartitions.updateValue(0l);
+    _numLoadRebalanceThrottledPartitions.updateValue(0l);
   }
 
   public void updateStateHandoffStats(MonitorState monitorState, long 
duration, boolean succeeded) {
     switch (monitorState) {
     case TOP_STATE:
       if (succeeded) {
-        _successTopStateHandoffCounter++;
-        _successfulTopStateHandoffDurationCounter += duration;
-        if (duration > _maxSinglePartitionTopStateHandoffDuration) {
-          _maxSinglePartitionTopStateHandoffDuration = duration;
+        
_successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue()
 + 1);
+        _successfulTopStateHandoffDurationCounter
+            .updateValue(_successfulTopStateHandoffDurationCounter.getValue() 
+ duration);
+        _partitionTopStateHandoffDurationGauge.updateValue(duration);
+        if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
+          _maxSinglePartitionTopStateHandoffDuration.updateValue(duration);
           _lastResetTime = System.currentTimeMillis();
         }
       } else {
-        _failedTopStateHandoffCounter++;
+        
_failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue()
 + 1);
       }
       break;
     default:
-      LOG.warn(
+      _logger.warn(
           String.format("Wrong monitor state \"%s\" that not supported ", 
monitorState.name()));
     }
   }
@@ -260,54 +298,43 @@ public class ResourceMonitor implements 
ResourceMonitorMBean {
   public void updateRebalancerStat(long numPendingRecoveryRebalancePartitions,
       long numPendingLoadRebalancePartitions, long 
numRecoveryRebalanceThrottledPartitions,
       long numLoadRebalanceThrottledPartitions) {
-    _numPendingRecoveryRebalancePartitions = 
numPendingRecoveryRebalancePartitions;
-    _numPendingLoadRebalancePartitions = numPendingLoadRebalancePartitions;
-    _numRecoveryRebalanceThrottledPartitions = 
numRecoveryRebalanceThrottledPartitions;
-    _numLoadRebalanceThrottledPartitions = numLoadRebalanceThrottledPartitions;
+    
_numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions);
+    
_numPendingLoadRebalancePartitions.updateValue(numPendingLoadRebalancePartitions);
+    
_numRecoveryRebalanceThrottledPartitions.updateValue(numRecoveryRebalanceThrottledPartitions);
+    
_numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
   }
 
-  @Override
   public long getExternalViewPartitionGauge() {
-    return _numOfPartitionsInExternalView;
+    return _numOfPartitionsInExternalView.getValue();
   }
 
-  @Override
   public long getMissingMinActiveReplicaPartitionGauge() {
-    return _numLessMinActiveReplicaPartitions;
+    return _numLessMinActiveReplicaPartitions.getValue();
   }
 
-  @Override
   public long getMissingReplicaPartitionGauge() {
-    return _numLessReplicaPartitions;
+    return _numLessReplicaPartitions.getValue();
   }
 
-  @Override
   public long getPendingRecoveryRebalancePartitionGauge() {
-    return _numPendingRecoveryRebalancePartitions;
+    return _numPendingRecoveryRebalancePartitions.getValue();
   }
 
-  @Override
   public long getPendingLoadRebalancePartitionGauge() {
-    return _numPendingLoadRebalancePartitions;
+    return _numPendingLoadRebalancePartitions.getValue();
   }
 
-  @Override
   public long getRecoveryRebalanceThrottledPartitionGauge() {
-    return _numRecoveryRebalanceThrottledPartitions;
+    return _numRecoveryRebalanceThrottledPartitions.getValue();
   }
 
-  @Override
   public long getLoadRebalanceThrottledPartitionGauge() {
-    return _numLoadRebalanceThrottledPartitions;
-  }
-
-  public String getBeanName() {
-    return _clusterName + " " + _resourceName;
+    return _numLoadRebalanceThrottledPartitions.getValue();
   }
 
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + RESET_TIME_RANGE <= System.currentTimeMillis()) {
-      _maxSinglePartitionTopStateHandoffDuration = 0L;
+      _maxSinglePartitionTopStateHandoffDuration.updateValue(0l);
       _lastResetTime = System.currentTimeMillis();
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index d10f66c..08eb2bc 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -19,31 +19,20 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import javax.management.JMException;
-import javax.management.ObjectName;
-
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.helix.HelixException;
-import org.apache.helix.monitoring.mbeans.dynamicMBeans.*;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
+import javax.management.JMException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
-public class ZkClientPathMonitor {
+public class ZkClientPathMonitor extends DynamicMBeanProvider {
   public static final String MONITOR_PATH = "PATH";
-  private static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client 
Monitor";
-
-  private static final MetricRegistry _metricRegistry = new MetricRegistry();
 
-  private DynamicMBeanProvider _dynamicMBeanProvider;
-  private ObjectName _objectName;
-  private String _monitorType;
-  private String _monitorKey;
-  private String _path;
+  private static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client 
Monitor";
+  private final String _sensorName;
 
   protected enum PredefinedPath {
     IdealStates(".*/IDEALSTATES/.*"),
@@ -84,25 +73,25 @@ public class ZkClientPathMonitor {
       new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l);
 
   private HistogramDynamicMetric _readLatencyGauge = new 
HistogramDynamicMetric("ReadLatencyGauge",
-      _metricRegistry.histogram(toString() + "ReadLatencyGauge"));
+      _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"ReadLatencyGauge"));
   private HistogramDynamicMetric _writeLatencyGauge =
       new HistogramDynamicMetric("WriteLatencyGauge",
-          _metricRegistry.histogram(toString() + "WriteLatencyGauge"));
+          _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"WriteLatencyGauge"));
   private HistogramDynamicMetric _readBytesGauge = new 
HistogramDynamicMetric("ReadBytesGauge",
-      _metricRegistry.histogram(toString() + "ReadBytesGauge"));
+      _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"ReadBytesGauge"));
   private HistogramDynamicMetric _writeBytesGauge = new 
HistogramDynamicMetric("WriteBytesGauge",
-      _metricRegistry.histogram(toString() + "WriteBytesGauge"));
+      _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"WriteBytesGauge"));
 
-  protected ZkClientPathMonitor(String path, String monitorType, String 
monitorKey)
-      throws JMException {
-    if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || 
monitorType
-        .isEmpty()) {
-      throw new HelixException("Cannot create ZkClientMonitor without monitor 
key and type.");
-    }
+  @Override
+  public String getSensorName() {
+    return _sensorName;
+  }
 
-    _monitorType = monitorType;
-    _monitorKey = monitorKey;
-    _path = path;
+  public ZkClientPathMonitor(String path, String monitorType, String 
monitorKey)
+      throws JMException {
+    _sensorName = String
+        .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), 
monitorType, monitorKey,
+            path);
 
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_readCounter);
@@ -117,32 +106,9 @@ public class ZkClientPathMonitor {
     attributeList.add(_writeLatencyGauge);
     attributeList.add(_readBytesGauge);
     attributeList.add(_writeBytesGauge);
-
-    _dynamicMBeanProvider = new DynamicMBeanProvider(String
-        .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), 
_monitorType, _monitorKey,
-            _path), MBEAN_DESCRIPTION, attributeList);
-
-    regitster(path, monitorType, monitorKey);
-  }
-
-  private void regitster(String path, String monitorType, String monitorKey) 
throws JMException {
-    _objectName = MBeanRegistrar
-        .register(_dynamicMBeanProvider, 
MonitorDomainNames.HelixZkClient.name(),
-            ZkClientMonitor.MONITOR_TYPE, monitorType, 
ZkClientMonitor.MONITOR_KEY, monitorKey,
-            MONITOR_PATH, path);
-  }
-
-  /**
-   * After unregistered, the MBean can't be registered again, a new monitor 
has be to created.
-   */
-  protected void unregister() {
-    MBeanRegistrar.unregister(_objectName);
-    _metricRegistry.removeMatching(new MetricFilter() {
-      @Override
-      public boolean matches(String name, Metric metric) {
-        return name.startsWith(toString());
-      }
-    });
+    register(attributeList, MBEAN_DESCRIPTION, 
MonitorDomainNames.HelixZkClient.name(),
+        ZkClientMonitor.MONITOR_TYPE, monitorType, 
ZkClientMonitor.MONITOR_KEY, monitorKey,
+        MONITOR_PATH, path);
   }
 
   protected void record(int bytes, long latencyMilliSec, boolean isFailure, 
boolean isRead) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 4d94e28..a2d2662 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -19,7 +19,12 @@ package org.apache.helix.monitoring.mbeans.dynamicMBeans;
  * under the License.
  */
 
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.helix.HelixException;
 import org.apache.helix.monitoring.SensorNameProvider;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.log4j.Logger;
 
 import javax.management.*;
@@ -28,36 +33,97 @@ import java.util.*;
 /**
  * Dynamic MBean provider that reporting DynamicMetric attributes
  */
-public class DynamicMBeanProvider implements DynamicMBean, SensorNameProvider {
+public abstract class DynamicMBeanProvider implements DynamicMBean, 
SensorNameProvider {
+  protected final Logger _logger = Logger.getLogger(getClass());
+  protected static final MetricRegistry _metricRegistry = new MetricRegistry();
   private static String SENSOR_NAME_TAG = "SensorName";
-  private static final Logger _logger = 
Logger.getLogger(DynamicMBeanProvider.class);
+  private static String DEFAULT_DESCRIPTION =
+      "Information on the management interface of the MBean";
 
   // Attribute name to the DynamicMetric object mapping
-  private Map<String, DynamicMetric> _attributeMap = new HashMap<>();
-  private MBeanInfo _mBeanInfo = null;
-  private String _sensorName;
+  private final Map<String, DynamicMetric> _attributeMap = new HashMap<>();
+  private ObjectName _objectName;
+  private MBeanInfo _mBeanInfo;
 
   /**
    * Instantiates a new Dynamic MBean provider.
    *
-   * @param sensorName     the sensor name
+   * @param dynamicMetrics Dynamic Metrics that are exposed by this provider
    * @param description    the MBean description
+   * @param domain         the MBean domain name
+   * @param keyValuePairs  the MBean object name components
+   */
+  protected synchronized void register(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+      String description, String domain, String... keyValuePairs) throws 
JMException {
+    if (_objectName != null) {
+      throw new HelixException(
+          "Mbean has been registered before. Please create new object for new 
registration.");
+    }
+    updateAttributtInfos(dynamicMetrics, description);
+    _objectName = MBeanRegistrar.register(this, domain, keyValuePairs);
+  }
+
+  /**
+   * Instantiates a new Dynamic MBean provider.
+   *
+   * @param dynamicMetrics Dynamic Metrics that are exposed by this provider
+   * @param description    the MBean description
+   * @param objectName     the proposed MBean ObjectName
+   */
+  protected synchronized void register(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+      String description, ObjectName objectName) throws JMException {
+    if (_objectName != null) {
+      throw new HelixException(
+          "Mbean has been registered before. Please create new object for new 
registration.");
+    }
+    updateAttributtInfos(dynamicMetrics, description);
+    _objectName = MBeanRegistrar.register(this, objectName);
+  }
+
+  protected synchronized void register(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+      ObjectName objectName) throws JMException {
+    register(dynamicMetrics, null, objectName);
+  }
+
+  /**
+   * After unregistered, the MBean can't be registered again, a new monitor 
has be to created.
+   */
+  public synchronized void unregister() {
+    _metricRegistry.removeMatching(new MetricFilter() {
+      @Override
+      public boolean matches(String name, Metric metric) {
+        return name.startsWith(getMetricRegistryNamePrefix());
+      }
+    });
+    MBeanRegistrar.unregister(_objectName);
+  }
+
+  protected String getMetricRegistryNamePrefix() {
+    return String.format("%s-%s-", getClass().getSimpleName(), 
Integer.toHexString(hashCode()));
+  }
+
+  /**
+   * Update the Dynamic MBean provider with new metric list.
+   *
+   * @param description    description of the MBean
    * @param dynamicMetrics the DynamicMetrics
    */
-  public DynamicMBeanProvider(String sensorName, String description,
-      Collection<DynamicMetric<?, ?>> dynamicMetrics) {
-    _sensorName = sensorName;
+  private void updateAttributtInfos(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+      String description) {
+    _attributeMap.clear();
 
     // get all attributes that can be emit by the dynamicMetrics.
     List<MBeanAttributeInfo> attributeInfoList = new ArrayList<>();
-    for (DynamicMetric dynamicMetric : dynamicMetrics) {
-      Iterator<MBeanAttributeInfo> iter = 
dynamicMetric.getAttributeInfos().iterator();
-      while (iter.hasNext()) {
-        MBeanAttributeInfo attributeInfo = iter.next();
-        // Info list to create MBean info
-        attributeInfoList.add(attributeInfo);
-        // Attribute mapping for getting attribute value when getAttribute() 
is called
-        _attributeMap.put(attributeInfo.getName(), dynamicMetric);
+    if (dynamicMetrics != null) {
+      for (DynamicMetric dynamicMetric : dynamicMetrics) {
+        Iterator<MBeanAttributeInfo> iter = 
dynamicMetric.getAttributeInfos().iterator();
+        while (iter.hasNext()) {
+          MBeanAttributeInfo attributeInfo = iter.next();
+          // Info list to create MBean info
+          attributeInfoList.add(attributeInfo);
+          // Attribute mapping for getting attribute value when getAttribute() 
is called
+          _attributeMap.put(attributeInfo.getName(), dynamicMetric);
+        }
       }
     }
 
@@ -72,6 +138,10 @@ public class DynamicMBeanProvider implements DynamicMBean, 
SensorNameProvider {
     MBeanAttributeInfo[] attributeInfos = new 
MBeanAttributeInfo[attributeInfoList.size()];
     attributeInfos = attributeInfoList.toArray(attributeInfos);
 
+    if (description == null) {
+      description = DEFAULT_DESCRIPTION;
+    }
+
     _mBeanInfo = new MBeanInfo(getClass().getName(), description, 
attributeInfos,
         new MBeanConstructorInfo[] { constructorInfo }, new 
MBeanOperationInfo[0],
         new MBeanNotificationInfo[0]);
@@ -130,9 +200,4 @@ public class DynamicMBeanProvider implements DynamicMBean, 
SensorNameProvider {
     // No operation supported
     return null;
   }
-
-  @Override
-  public String getSensorName() {
-    return _sensorName;
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
index 0f41ca4..e183052 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
@@ -22,14 +22,21 @@ public class TestHelixCallbackMonitor {
 
   private ObjectName buildObjectName(InstanceType type, String cluster,
       HelixConstants.ChangeType changeType) throws 
MalformedObjectNameException {
-    return buildObjectName(type, cluster, changeType, 0);
+    return 
MBeanRegistrar.buildObjectName(MonitorDomainNames.HelixCallback.name(),
+        HelixCallbackMonitor.MONITOR_TYPE, type.name(), 
HelixCallbackMonitor.MONITOR_KEY, cluster,
+        HelixCallbackMonitor.MONITOR_CHANGE_TYPE, changeType.name());
   }
 
   private ObjectName buildObjectName(InstanceType type, String cluster,
       HelixConstants.ChangeType changeType, int num) throws 
MalformedObjectNameException {
-    return MBeanRegistrar.buildObjectName(num, 
MonitorDomainNames.HelixCallback.name(),
-        HelixCallbackMonitor.MONITOR_TYPE, type.name(), 
HelixCallbackMonitor.MONITOR_KEY, cluster,
-        HelixCallbackMonitor.MONITOR_CHANGE_TYPE, changeType.name());
+    ObjectName objectName = buildObjectName(type, cluster, changeType);
+    if (num > 0) {
+      return new ObjectName(String
+          .format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE,
+              String.valueOf(num)));
+    } else {
+      return objectName;
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 050c29d..8b1ebe1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -26,6 +26,9 @@ import java.util.Map;
 
 import java.util.Random;
 import java.util.TreeMap;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ExternalView;
@@ -41,9 +44,9 @@ public class TestResourceMonitor {
   int _replicas = 3;
   int _partitions = 50;
 
-  @Test() public void testReportData() {
+  @Test() public void testReportData() throws JMException {
     final int n = 5;
-    ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName);
+    ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName, new 
ObjectName("testDomain:key=value"));
 
     List<String> instances = new ArrayList<String>();
     for (int i = 0; i < n; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b5f46388/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index e227b86..c5ee212 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -19,14 +19,14 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.lang.management.ManagementFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import java.lang.management.ManagementFactory;
 
 public class TestZkClientMonitor {
 
@@ -40,9 +40,14 @@ public class TestZkClientMonitor {
 
   private ObjectName buildObjectName(String tag, String key, int num)
       throws MalformedObjectNameException {
-    return MBeanRegistrar
-        .buildObjectName(num, MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
-            tag, ZkClientMonitor.MONITOR_KEY, key);
+    ObjectName objectName = buildObjectName(tag, key);
+    if (num > 0) {
+      return new ObjectName(String
+          .format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE,
+              String.valueOf(num)));
+    } else {
+      return objectName;
+    }
   }
 
   private ObjectName buildPathMonitorObjectName(String tag, String key, String 
path)

Reply via email to