Avoid instance name in the HelixZkClient, HelixCallback and 
ParticipantMessageMonitor metric sensor name.

Instance name is duplicate information in most cases. The monitor system 
already be aware of this information when it collects metrics from a host. To 
make it worse, instance name makes the metric sensor name hard to consolidate.
This change remove the instance related information from "sensorName".

Note that for the use cases that multiple applications are deployed on the same 
host, instance name is still required to distinguish monitor items. So this 
info is still kept in MBean name.

Additional change: Refine MessageLatencyMonitor sensor name.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ead6a729
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ead6a729
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ead6a729

Branch: refs/heads/master
Commit: ead6a729d71d12020f480eba1620f0c91c36c671
Parents: f464242
Author: Jiajun Wang <jjw...@linkedin.com>
Authored: Wed Sep 13 15:04:45 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:08:24 2017 -0800

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |  13 +-
 .../org/apache/helix/manager/zk/ZkClient.java   |  36 +++-
 .../messaging/DefaultMessagingService.java      |   2 +-
 .../messaging/handling/HelixTaskExecutor.java   |   2 +-
 .../monitoring/ParticipantStatusMonitor.java    | 190 ------------------
 .../monitoring/mbeans/ClusterEventMonitor.java  |   7 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java |  54 ++----
 .../monitoring/mbeans/HelixCallbackMonitor.java |  36 ++--
 .../monitoring/mbeans/InstanceMonitor.java      |   8 -
 .../mbeans/MessageLatencyMonitor.java           |  31 +--
 .../monitoring/mbeans/MessageQueueMonitor.java  |   2 +-
 .../mbeans/ParticipantMessageMonitor.java       |   8 +-
 .../mbeans/ParticipantStatusMonitor.java        | 192 +++++++++++++++++++
 .../mbeans/PerInstanceResourceMonitor.java      |   8 +-
 .../monitoring/mbeans/ResourceMonitor.java      |  30 +--
 .../monitoring/mbeans/ResourceMonitorMBean.java |  74 -------
 .../mbeans/StateTransitionStatMonitor.java      |   5 -
 .../monitoring/mbeans/ZkClientMonitor.java      |  33 ++--
 .../monitoring/mbeans/ZkClientPathMonitor.java  |  29 ++-
 .../dynamicMBeans/DynamicMBeanProvider.java     |  39 ++--
 .../helix/store/zk/ZkHelixPropertyStore.java    |   8 +-
 .../helix/task/TaskStateModelFactory.java       |   3 +
 .../TestClusterEventStatusMonitor.java          |   6 +-
 .../monitoring/TestParticipantMonitor.java      |   4 +-
 .../mbeans/TestDisableResourceMbean.java        |   2 +-
 .../mbeans/TestDropResourceMetricsReset.java    |   2 +-
 .../mbeans/TestHelixCallbackMonitor.java        |  11 +-
 .../mbeans/TestResetClusterMetrics.java         |   2 +-
 .../monitoring/mbeans/TestResourceMonitor.java  |   1 +
 .../monitoring/mbeans/TestZkClientMonitor.java  |  47 +++--
 .../store/zk/TestZkHelixPropertyStore.java      |   2 +-
 31 files changed, 427 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index d048ad1..db660bd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -209,8 +209,8 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     }
 
     _instanceName = instanceName;
-    _preConnectCallbacks = new ArrayList<PreConnectCallback>();
-    _handlers = new ArrayList<CallbackHandler>();
+    _preConnectCallbacks = new ArrayList<>();
+    _handlers = new ArrayList<>();
     _properties = new 
HelixManagerProperties("cluster-manager-version.properties");
     _version = _properties.getVersion();
 
@@ -219,8 +219,10 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     try {
       _callbackMonitors = new HashMap<>();
       for (ChangeType changeType : ChangeType.values()) {
-        _callbackMonitors.put(changeType, new 
HelixCallbackMonitor(instanceType,
-            String.format("%s.%s", clusterName, instanceName), changeType));
+        HelixCallbackMonitor callbackMonitor =
+            new HelixCallbackMonitor(instanceType, clusterName, instanceName, 
changeType);
+        callbackMonitor.register();
+        _callbackMonitors.put(changeType, callbackMonitor);
       }
     } catch (JMException e) {
       LOG.error("Error in creating callback monitor.", e);
@@ -603,7 +605,8 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     zkClientBuilder.setZkServer(_zkAddress).setSessionTimeout(_sessionTimeout)
         
.setConnectionTimeout(_clientConnectionTimeout).setZkSerializer(zkSerializer)
         .setMonitorType(_instanceType.name())
-        .setMonitorKey(String.format("%s.%s", _clusterName, _instanceName))
+        .setMonitorKey(_clusterName)
+        .setMonitorInstanceName(_instanceName)
         .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) 
&& !_instanceType
             .equals(InstanceType.CONTROLLER_PARTICIPANT));
     _zkclient = zkClientBuilder.build();

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 7ec693f..d5ca32a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -64,16 +64,16 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
   private ZkClient(IZkConnection connection, int connectionTimeout, long 
operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey,
-      boolean monitorRootPathOnly) {
+      String monitorInstanceName, boolean monitorRootPathOnly) {
     super(connection, connectionTimeout, new ByteArraySerializer(), 
operationRetryTimeout);
-    init(zkSerializer, monitorType, monitorKey, monitorRootPathOnly);
+    init(zkSerializer, monitorType, monitorKey, monitorInstanceName, 
monitorRootPathOnly);
   }
 
   public ZkClient(IZkConnection connection, int connectionTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey,
       long operationRetryTimeout) {
     this(connection, connectionTimeout, operationRetryTimeout, zkSerializer, 
monitorType,
-        monitorKey, true);
+        monitorKey, null, true);
   }
 
   public ZkClient(IZkConnection connection, int connectionTimeout,
@@ -133,16 +133,17 @@ public class ZkClient extends 
org.I0Itec.zkclient.ZkClient {
   }
 
   protected void init(PathBasedZkSerializer zkSerializer, String monitorType, 
String monitorKey,
-      boolean monitorRootPathOnly) {
+      String monitorInstanceName, boolean monitorRootPathOnly) {
     _zkSerializer = zkSerializer;
     if (LOG.isTraceEnabled()) {
       StackTraceElement[] calls = Thread.currentThread().getStackTrace();
       LOG.trace("created a zkclient. callstack: " + Arrays.asList(calls));
     }
     try {
-      if (monitorKey != null && !monitorKey.isEmpty() &&
-          monitorType != null && !monitorType.isEmpty()) {
-        _monitor = new ZkClientMonitor(monitorType, monitorKey, 
monitorRootPathOnly);
+      if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null 
&& !monitorType
+          .isEmpty()) {
+        _monitor =
+            new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, 
monitorRootPathOnly);
       } else {
         LOG.info("ZkClient monitor key or type is not provided. Skip 
monitoring.");
       }
@@ -605,6 +606,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
     String _monitorType;
     String _monitorKey;
+    String _monitorInstanceName = null;
     boolean _monitorRootPathOnly = true;
 
     public Builder setConnection(IZkConnection connection) {
@@ -627,16 +629,34 @@ public class ZkClient extends 
org.I0Itec.zkclient.ZkClient {
       return this;
     }
 
+    /**
+     * Used as part of the MBean ObjectName. This item is required for 
enabling monitoring.
+     * @param monitorType
+     */
     public Builder setMonitorType(String monitorType) {
       this._monitorType = monitorType;
       return this;
     }
 
+    /**
+     * Used as part of the MBean ObjectName. This item is required for 
enabling monitoring.
+     * @param monitorKey
+     */
     public Builder setMonitorKey(String monitorKey) {
       this._monitorKey = monitorKey;
       return this;
     }
 
+    /**
+     * Used as part of the MBean ObjectName. This item is optional.
+     * @param instanceName
+     */
+    public Builder setMonitorInstanceName(String instanceName) {
+      this._monitorInstanceName = instanceName;
+      return this;
+    }
+
+
     public Builder setMonitorRootPathOnly(Boolean monitorRootPathOnly) {
       this._monitorRootPathOnly = monitorRootPathOnly;
       return this;
@@ -676,7 +696,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
       }
 
       return new ZkClient(_connection, _connectionTimeout, 
_operationRetryTimeout, _zkSerializer,
-          _monitorType, _monitorKey, _monitorRootPathOnly);
+          _monitorType, _monitorKey, _monitorInstanceName, 
_monitorRootPathOnly);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index a1de26a..cd8f573 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -42,7 +42,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
-import org.apache.helix.monitoring.ParticipantStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.log4j.Logger;
 
 public class DefaultMessagingService implements ClusterMessagingService {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 6bddd16..507f273 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -59,7 +59,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.monitoring.ParticipantStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
 import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
deleted file mode 100644
index ee15aec..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.model.Message;
-import org.apache.helix.monitoring.mbeans.*;
-import org.apache.log4j.Logger;
-
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-
-public class ParticipantStatusMonitor {
-  private final ConcurrentHashMap<StateTransitionContext, 
StateTransitionStatMonitor> _monitorMap =
-      new ConcurrentHashMap<StateTransitionContext, 
StateTransitionStatMonitor>();
-  private static final Logger LOG = 
Logger.getLogger(ParticipantStatusMonitor.class);
-  public static final String PARTICIPANT_KEY = "ParticipantName";
-  public static final String PARTICIPANT_STATUS_KEY = 
"ParticipantMessageStatus";
-
-  private MBeanServer _beanServer;
-  private ParticipantMessageMonitor _messageMonitor;
-  private MessageLatencyMonitor _messageLatencyMonitor;
-  private Map<String, ThreadPoolExecutorMonitor> _executorMonitors;
-
-  public ParticipantStatusMonitor(boolean isParticipant, String instanceName) {
-    try {
-      _beanServer = ManagementFactory.getPlatformMBeanServer();
-      if (isParticipant) {
-        _messageMonitor = new ParticipantMessageMonitor(instanceName);
-        _messageLatencyMonitor =
-            new 
MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), 
instanceName);
-        _executorMonitors = new ConcurrentHashMap<>();
-        register(_messageMonitor, 
getObjectName(_messageMonitor.getParticipantBeanName()));
-      }
-    } catch (Exception e) {
-      LOG.warn(e);
-      e.printStackTrace();
-      _beanServer = null;
-    }
-  }
-
-  public synchronized void reportReceivedMessage(Message message) {
-    if (_messageMonitor != null) {  // is participant
-      _messageMonitor.incrementReceivedMessages(1);
-      _messageMonitor.incrementPendingMessages(1);
-      _messageLatencyMonitor.updateLatency(message);
-    }
-  }
-
-  public synchronized void reportProcessedMessage(Message message,
-      ParticipantMessageMonitor.ProcessedMessageState processedMessageState) {
-    if (_messageMonitor != null) {  // is participant
-      switch (processedMessageState) {
-        case DISCARDED:
-          _messageMonitor.incrementDiscardedMessages(1);
-          _messageMonitor.decrementPendingMessages(1);
-          break;
-        case FAILED:
-          _messageMonitor.incrementFailedMessages(1);
-          _messageMonitor.decrementPendingMessages(1);
-          break;
-        case COMPLETED:
-          _messageMonitor.incrementCompletedMessages(1);
-          _messageMonitor.decrementPendingMessages(1);
-          break;
-      }
-    }
-  }
-
-  public void reportTransitionStat(StateTransitionContext cxt, 
StateTransitionDataPoint data) {
-    if (_beanServer == null) {
-      LOG.warn("bean server is null, skip reporting");
-      return;
-    }
-    try {
-      if (!_monitorMap.containsKey(cxt)) {
-        synchronized (this) {
-          if (!_monitorMap.containsKey(cxt)) {
-            StateTransitionStatMonitor bean =
-                new StateTransitionStatMonitor(cxt);
-            _monitorMap.put(cxt, bean);
-            String beanName = cxt.toString();
-            register(bean, getObjectName(beanName));
-          }
-        }
-      }
-      _monitorMap.get(cxt).addDataPoint(data);
-    } catch (Exception e) {
-      LOG.warn(e);
-      e.printStackTrace();
-    }
-  }
-
-  private ObjectName getObjectName(String name) throws 
MalformedObjectNameException {
-    LOG.info("Registering bean: " + name);
-    return new ObjectName(String.format("%s:%s", 
MonitorDomainNames.CLMParticipantReport.name(), name));
-  }
-
-  private void register(Object bean, ObjectName name) {
-    if (_beanServer == null) {
-      LOG.warn("bean server is null, skip reporting");
-      return;
-    }
-    try {
-      _beanServer.unregisterMBean(name);
-    } catch (Exception e1) {
-      // Swallow silently
-    }
-
-    try {
-      _beanServer.registerMBean(bean, name);
-    } catch (Exception e) {
-      LOG.warn("Could not register MBean", e);
-    }
-  }
-
-  public void shutDown() {
-    if (_messageMonitor != null) {  // is participant
-      try {
-        ObjectName name = 
getObjectName(_messageMonitor.getParticipantBeanName());
-        if (_beanServer.isRegistered(name)) {
-          _beanServer.unregisterMBean(name);
-        }
-      } catch (Exception e) {
-        LOG.warn("fail to unregister " + 
_messageMonitor.getParticipantBeanName(), e);
-      }
-    }
-    if (_messageLatencyMonitor != null) {
-      _messageLatencyMonitor.unregister();
-    }
-    for (StateTransitionContext cxt : _monitorMap.keySet()) {
-      try {
-        ObjectName name = getObjectName(cxt.toString());
-        if (_beanServer.isRegistered(name)) {
-          _beanServer.unregisterMBean(name);
-        }
-      } catch (Exception e) {
-        LOG.warn("fail to unregister " + cxt.toString(), e);
-      }
-    }
-    _monitorMap.clear();
-  }
-
-  public void createExecutorMonitor(String type, ExecutorService executor) {
-    if (_executorMonitors == null) {
-      return;
-    }
-    if (! (executor instanceof ThreadPoolExecutor)) {
-      return;
-    }
-
-    try {
-      _executorMonitors.put(type,
-          new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor) executor));
-    } catch (JMException e) {
-      LOG.warn(String.format(
-          "Error in creating ThreadPoolExecutorMonitor for type=%s", type), e);
-    }
-  }
-
-  public void removeExecutorMonitor(String type) {
-    if (_executorMonitors != null && _executorMonitors.containsKey(type)) {
-      _executorMonitors.get(type).unregister();
-      _executorMonitors.remove(type);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
index b6853cb..a3cd1ba 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
@@ -25,8 +25,6 @@ import 
org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 import javax.management.JMException;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -82,12 +80,13 @@ public class ClusterEventMonitor extends 
DynamicMBeanProvider {
         "ClusterEvent", PHASE_DN_KEY, _phaseName);
   }
 
-  public void register() throws JMException {
+  public ClusterEventMonitor register() throws JMException {
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_totalDuration);
     attributeList.add(_maxDuration);
     attributeList.add(_count);
     attributeList.add(_duration);
-    register(attributeList, 
_clusterStatusMonitor.getObjectName(getBeanName()));
+    doRegister(attributeList, 
_clusterStatusMonitor.getObjectName(getBeanName()));
+    return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 536865b..88d9f68 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -21,34 +21,18 @@ package org.apache.helix.monitoring.mbeans;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.model.*;
+import org.apache.helix.task.*;
+import org.apache.log4j.Logger;
+
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.apache.log4j.Logger;
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
@@ -83,7 +67,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = 
new ConcurrentHashMap<>();
 
   // phaseName -> eventMonitor
-  private final ConcurrentHashMap<String, ClusterEventMonitor> 
_clusterEventMbeanMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ClusterEventMonitor> 
_clusterEventMbeanMap =
+      new ConcurrentHashMap<>();
 
   /**
    * PerInstanceResource bean map: beanName->bean
@@ -106,13 +91,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   }
 
   public ObjectName getObjectName(String name) throws 
MalformedObjectNameException {
-    return new ObjectName(String.format("%s: %s", 
MonitorDomainNames.ClusterStatus.name(), name));
-  }
-
-  // TODO remove getBeanName()?
-  // Used by other external JMX consumers like ingraph
-  public String getBeanName() {
-    return MonitorDomainNames.ClusterStatus.name() + " " + _clusterName;
+    return new ObjectName(String.format("%s:%s", 
MonitorDomainNames.ClusterStatus.name(), name));
   }
 
   public String getClusterName() {
@@ -287,8 +266,8 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
         }
         monitor.register();
       } catch (JMException e) {
-        LOG.error(
-            "Failed to register ClusterEventMonitorMbean for cluster " + 
_clusterName + " and phase type: " + phase, e);
+        LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + 
_clusterName
+            + " and phase type: " + phase, e);
         return;
       }
     }
@@ -458,7 +437,9 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
         synchronized (this) {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             String beanName = getResourceBeanName(resourceName);
-            ResourceMonitor bean = new ResourceMonitor(_clusterName, 
resourceName, getObjectName(beanName));
+            ResourceMonitor bean =
+                new ResourceMonitor(_clusterName, resourceName, 
getObjectName(beanName));
+            bean.register();
             _resourceMbeanMap.put(resourceName, bean);
           }
         }
@@ -555,6 +536,9 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
         continue;
       }
       WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflow);
+      if (workflowConfig == null) {
+        continue;
+      }
       Set<String> allJobs = workflowConfig.getJobDag().getAllNodes();
       WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
       for (String job : allJobs) {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
index d23c405..ad9c787 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
@@ -38,6 +38,9 @@ public class HelixCallbackMonitor extends 
DynamicMBeanProvider {
   private static final String MBEAN_DESCRIPTION = "Helix Callback Monitor";
   private final String _sensorName;
   private final HelixConstants.ChangeType _changeType;
+  private final InstanceType _type;
+  private final String _clusterName;
+  private final String _instanceName;
 
   private SimpleDynamicMetric<Long> _counter = new 
SimpleDynamicMetric("Counter", 0l);
   private SimpleDynamicMetric<Long> _unbatchedCounter =
@@ -48,20 +51,17 @@ public class HelixCallbackMonitor extends 
DynamicMBeanProvider {
   private HistogramDynamicMetric _latencyGauge = new 
HistogramDynamicMetric("LatencyGauge",
       _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"LatencyGauge"));
 
-  public HelixCallbackMonitor(InstanceType type, String key, 
HelixConstants.ChangeType changeType)
-      throws JMException {
+  public HelixCallbackMonitor(InstanceType type, String clusterName, String 
instanceName,
+      HelixConstants.ChangeType changeType) throws JMException {
     _changeType = changeType;
+    _type = type;
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+
+    // Don't put instanceName into sensor name. This detail information is in 
the MBean name already.
     _sensorName = String
-        .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), 
type.name(), key,
+        .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), 
type.name(), clusterName,
             changeType.name());
-
-    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
-    attributeList.add(_counter);
-    attributeList.add(_unbatchedCounter);
-    attributeList.add(_totalLatencyCounter);
-    attributeList.add(_latencyGauge);
-    register(attributeList, MBEAN_DESCRIPTION, 
MonitorDomainNames.HelixCallback.name(),
-        MONITOR_TYPE, type.name(), MONITOR_KEY, key, MONITOR_CHANGE_TYPE, 
changeType.name());
   }
 
   @Override
@@ -82,4 +82,18 @@ public class HelixCallbackMonitor extends 
DynamicMBeanProvider {
   public void increaseCallbackUnbatchedCounters() {
     _unbatchedCounter.updateValue(_unbatchedCounter.getValue() + 1);
   }
+
+  @Override
+  public HelixCallbackMonitor register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_counter);
+    attributeList.add(_unbatchedCounter);
+    attributeList.add(_totalLatencyCounter);
+    attributeList.add(_latencyGauge);
+    doRegister(attributeList, MBEAN_DESCRIPTION, 
MonitorDomainNames.HelixCallback.name(),
+        MONITOR_TYPE, _type.name(), MONITOR_KEY,
+        _clusterName + (_instanceName == null ? "" : "." + _instanceName), 
MONITOR_CHANGE_TYPE,
+        _changeType.name());
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index 8c8c80c..dc43d48 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -97,14 +97,6 @@ public class InstanceMonitor implements InstanceMonitorMBean 
{
     return _participantName;
   }
 
-  /**
-   * Helper for basic formatted view of this bean
-   * @return bean name
-   */
-  public String getBeanName() {
-    return _clusterName + " " + serializedTags() + " " + _participantName;
-  }
-
   private String serializedTags() {
     return Joiner.on('|').skipNulls().join(_tags).toString();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
index 76ab42e..cc00496 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
@@ -20,7 +20,6 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import org.apache.helix.model.Message;
-import org.apache.helix.monitoring.ParticipantStatusMonitor;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
@@ -31,10 +30,10 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class MessageLatencyMonitor extends DynamicMBeanProvider {
-  public static String MONITOR_TYPE_KW = "MonitorType";
-
   private static final String MBEAN_DESCRIPTION = "Helix Message Latency 
Monitor";
   private final String _sensorName;
+  private final String _domainName;
+  private final String _participantName;
 
   private SimpleDynamicMetric<Long> _totalMessageCount =
       new SimpleDynamicMetric("TotalMessageCount", 0l);
@@ -45,16 +44,10 @@ public class MessageLatencyMonitor extends 
DynamicMBeanProvider {
           _metricRegistry.histogram(getMetricRegistryNamePrefix() + 
"MessageLatencyGauge"));
 
   public MessageLatencyMonitor(String domainName, String participantName) 
throws JMException {
-    _sensorName = String
-        .format("%s.%s.%s.%s", 
ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, participantName,
-            MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName());
-
-    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
-    attributeList.add(_totalMessageCount);
-    attributeList.add(_totalMessageLatency);
-    attributeList.add(_messageLatencyGauge);
-    register(attributeList, MBEAN_DESCRIPTION, domainName, 
ParticipantStatusMonitor.PARTICIPANT_KEY,
-        participantName, MONITOR_TYPE_KW, 
MessageLatencyMonitor.class.getSimpleName());
+    _domainName = domainName;
+    _participantName = participantName;
+    _sensorName = String.format("%s.%s", 
ParticipantMessageMonitor.PARTICIPANT_STATUS_KEY,
+        "MessageLatency");
   }
 
   @Override
@@ -70,4 +63,16 @@ public class MessageLatencyMonitor extends 
DynamicMBeanProvider {
     _totalMessageLatency.updateValue(_totalMessageLatency.getValue() + 
latency);
     _messageLatencyGauge.updateValue(latency);
   }
+
+  @Override
+  public MessageLatencyMonitor register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_totalMessageCount);
+    attributeList.add(_totalMessageLatency);
+    attributeList.add(_messageLatencyGauge);
+    doRegister(attributeList, MBEAN_DESCRIPTION, _domainName, 
ParticipantMessageMonitor.PARTICIPANT_KEY,
+        _participantName, "MonitorType", 
MessageLatencyMonitor.class.getSimpleName());
+
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
index 9fcf279..4ae47ed 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
@@ -121,6 +121,6 @@ public class MessageQueueMonitor implements 
MessageQueueMonitorMBean {
   }
 
   public ObjectName getObjectName(String name) throws 
MalformedObjectNameException {
-    return new ObjectName(String.format("%s: %s", 
MonitorDomainNames.ClusterStatus.name(), name));
+    return new ObjectName(String.format("%s:%s", 
MonitorDomainNames.ClusterStatus.name(), name));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index b8055e5..1b47ce8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -1,8 +1,8 @@
 package org.apache.helix.monitoring.mbeans;
 
-import org.apache.helix.monitoring.ParticipantStatusMonitor;
-
 public class ParticipantMessageMonitor implements 
ParticipantMessageMonitorMBean {
+  public static final String PARTICIPANT_KEY = "ParticipantName";
+  public static final String PARTICIPANT_STATUS_KEY = 
"ParticipantMessageStatus";
 
   /**
    * The current processed state of the message
@@ -25,7 +25,7 @@ public class ParticipantMessageMonitor implements 
ParticipantMessageMonitorMBean
   }
 
   public String getParticipantBeanName() {
-    return String.format("%s=%s", ParticipantStatusMonitor.PARTICIPANT_KEY, 
_participantName);
+    return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
   }
 
   public void incrementReceivedMessages(int count) {
@@ -79,7 +79,7 @@ public class ParticipantMessageMonitor implements 
ParticipantMessageMonitorMBean
 
   @Override
   public String getSensorName() {
-    return ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY + "." + 
_participantName;
+    return PARTICIPANT_STATUS_KEY;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
new file mode 100644
index 0000000..66bb3b7
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -0,0 +1,192 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.model.Message;
+import org.apache.helix.monitoring.StateTransitionContext;
+import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.apache.log4j.Logger;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public class ParticipantStatusMonitor {
+  private final ConcurrentHashMap<StateTransitionContext, 
StateTransitionStatMonitor> _monitorMap =
+      new ConcurrentHashMap<StateTransitionContext, 
StateTransitionStatMonitor>();
+  private static final Logger LOG = 
Logger.getLogger(ParticipantStatusMonitor.class);
+
+  private MBeanServer _beanServer;
+  private ParticipantMessageMonitor _messageMonitor;
+  private MessageLatencyMonitor _messageLatencyMonitor;
+  private Map<String, ThreadPoolExecutorMonitor> _executorMonitors;
+
+  public ParticipantStatusMonitor(boolean isParticipant, String instanceName) {
+    try {
+      _beanServer = ManagementFactory.getPlatformMBeanServer();
+      if (isParticipant) {
+        _messageMonitor = new ParticipantMessageMonitor(instanceName);
+        _messageLatencyMonitor =
+            new 
MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), 
instanceName);
+        _messageLatencyMonitor.register();
+        _executorMonitors = new ConcurrentHashMap<>();
+        register(_messageMonitor, 
getObjectName(_messageMonitor.getParticipantBeanName()));
+      }
+    } catch (Exception e) {
+      LOG.warn(e);
+      e.printStackTrace();
+      _beanServer = null;
+    }
+  }
+
+  public synchronized void reportReceivedMessage(Message message) {
+    if (_messageMonitor != null) {  // is participant
+      _messageMonitor.incrementReceivedMessages(1);
+      _messageMonitor.incrementPendingMessages(1);
+      _messageLatencyMonitor.updateLatency(message);
+    }
+  }
+
+  public synchronized void reportProcessedMessage(Message message,
+      ParticipantMessageMonitor.ProcessedMessageState processedMessageState) {
+    if (_messageMonitor != null) {  // is participant
+      switch (processedMessageState) {
+        case DISCARDED:
+          _messageMonitor.incrementDiscardedMessages(1);
+          _messageMonitor.decrementPendingMessages(1);
+          break;
+        case FAILED:
+          _messageMonitor.incrementFailedMessages(1);
+          _messageMonitor.decrementPendingMessages(1);
+          break;
+        case COMPLETED:
+          _messageMonitor.incrementCompletedMessages(1);
+          _messageMonitor.decrementPendingMessages(1);
+          break;
+      }
+    }
+  }
+
+  public void reportTransitionStat(StateTransitionContext cxt, 
StateTransitionDataPoint data) {
+    if (_beanServer == null) {
+      LOG.warn("bean server is null, skip reporting");
+      return;
+    }
+    try {
+      if (!_monitorMap.containsKey(cxt)) {
+        synchronized (this) {
+          if (!_monitorMap.containsKey(cxt)) {
+            StateTransitionStatMonitor bean =
+                new StateTransitionStatMonitor(cxt);
+            _monitorMap.put(cxt, bean);
+            String beanName = cxt.toString();
+            register(bean, getObjectName(beanName));
+          }
+        }
+      }
+      _monitorMap.get(cxt).addDataPoint(data);
+    } catch (Exception e) {
+      LOG.warn(e);
+      e.printStackTrace();
+    }
+  }
+
+  private ObjectName getObjectName(String name) throws 
MalformedObjectNameException {
+    LOG.info("Registering bean: " + name);
+    return new ObjectName(String.format("%s:%s", 
MonitorDomainNames.CLMParticipantReport.name(), name));
+  }
+
+  private void register(Object bean, ObjectName name) {
+    if (_beanServer == null) {
+      LOG.warn("bean server is null, skip reporting");
+      return;
+    }
+    try {
+      _beanServer.unregisterMBean(name);
+    } catch (Exception e1) {
+      // Swallow silently
+    }
+
+    try {
+      _beanServer.registerMBean(bean, name);
+    } catch (Exception e) {
+      LOG.warn("Could not register MBean", e);
+    }
+  }
+
+  public void shutDown() {
+    if (_messageMonitor != null) {  // is participant
+      try {
+        ObjectName name = 
getObjectName(_messageMonitor.getParticipantBeanName());
+        if (_beanServer.isRegistered(name)) {
+          _beanServer.unregisterMBean(name);
+        }
+      } catch (Exception e) {
+        LOG.warn("fail to unregister " + 
_messageMonitor.getParticipantBeanName(), e);
+      }
+    }
+    if (_messageLatencyMonitor != null) {
+      _messageLatencyMonitor.unregister();
+    }
+    for (StateTransitionContext cxt : _monitorMap.keySet()) {
+      try {
+        ObjectName name = getObjectName(cxt.toString());
+        if (_beanServer.isRegistered(name)) {
+          _beanServer.unregisterMBean(name);
+        }
+      } catch (Exception e) {
+        LOG.warn("fail to unregister " + cxt.toString(), e);
+      }
+    }
+    _monitorMap.clear();
+  }
+
+  public void createExecutorMonitor(String type, ExecutorService executor) {
+    if (_executorMonitors == null) {
+      return;
+    }
+    if (! (executor instanceof ThreadPoolExecutor)) {
+      return;
+    }
+
+    try {
+      _executorMonitors.put(type,
+          new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor) executor));
+    } catch (JMException e) {
+      LOG.warn(String.format(
+          "Error in creating ThreadPoolExecutorMonitor for type=%s", type), e);
+    }
+  }
+
+  public void removeExecutorMonitor(String type) {
+    if (_executorMonitors != null) {
+      ThreadPoolExecutorMonitor monitor = _executorMonitors.remove(type);
+      if (monitor != null) {
+        monitor.unregister();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
index 714767b..76959cf 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -92,11 +92,9 @@ public class PerInstanceResourceMonitor implements 
PerInstanceResourceMonitorMBe
 
   @Override
   public String getSensorName() {
-    return Joiner
-        .on('.')
-        .join(
-            ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, 
_clusterName,
-                serializedTags(), _participantName, _resourceName)).toString();
+    return Joiner.on('.').join(ImmutableList
+        .of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName, 
serializedTags(),
+            _participantName, _resourceName)).toString();
   }
 
   private String serializedTags() {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 6382359..080ed62 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -76,18 +76,12 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
-  private String _resourceName;
-  private String _clusterName;
-
-  public enum MonitorState {
-    TOP_STATE
-  }
-
-  public ResourceMonitor(String clusterName, String resourceName, ObjectName 
objectName)
-      throws JMException {
-    _clusterName = clusterName;
-    _resourceName = resourceName;
+  private final String _resourceName;
+  private final String _clusterName;
+  private final ObjectName _initObjectName;
 
+  @Override
+  public ResourceMonitor register() throws JMException {
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_numOfPartitions);
     attributeList.add(_numOfPartitionsInExternalView);
@@ -106,8 +100,20 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
     attributeList.add(_partitionTopStateHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
+    doRegister(attributeList, _initObjectName);
 
-    register(attributeList, objectName);
+    return this;
+  }
+
+  public enum MonitorState {
+    TOP_STATE
+  }
+
+  public ResourceMonitor(String clusterName, String resourceName, ObjectName 
objectName)
+      throws JMException {
+    _clusterName = clusterName;
+    _resourceName = resourceName;
+    _initObjectName = objectName;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
deleted file mode 100644
index 75e33c7..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-public interface ResourceMonitorMBean extends SensorNameProvider {
-  public long getPartitionGauge();
-
-  public long getErrorPartitionGauge();
-
-  public long getMissingTopStatePartitionGauge();
-
-  public long getMissingMinActiveReplicaPartitionGauge();
-
-  public long getMissingReplicaPartitionGauge();
-
-  public long getPendingRecoveryRebalancePartitionGauge();
-
-  public long getPendingLoadRebalancePartitionGauge();
-
-  public long getRecoveryRebalanceThrottledPartitionGauge();
-
-  public long getLoadRebalanceThrottledPartitionGauge();
-
-  public long getDifferenceWithIdealStateGauge();
-
-  public long getExternalViewPartitionGauge();
-
-  /**
-   * Get aggregated successful handoff duration
-   * @return
-   */
-  public long getSuccessfulTopStateHandoffDurationCounter();
-
-  /**
-   * Get number of top state successful handoffs
-   * @return
-   */
-  public long getSucceededTopStateHandoffCounter();
-
-  /**
-   * Get number of top state failed handoffs
-   * @return
-   */
-  public long getFailedTopStateHandoffCounter();
-
-  /**
-   * Get maximum single partition successful handoff
-   */
-  public long getMaxSinglePartitionTopStateHandoffDurationGauge();
-
-  /**
-   * Get total message received for this resource
-   * */
-  public long getTotalMessageReceived();
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
index c751d5d..4a4c89e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
@@ -51,11 +51,6 @@ public class StateTransitionStatMonitor implements 
StateTransitionStatMonitorMBe
     return _context;
   }
 
-  public String getBeanName() {
-    return _context.getClusterName() + " " + _context.getResourceName() + " "
-        + _context.getTransition();
-  }
-
   public String getSensorName() {
     return String.format("StateTransitionStat.%s.%s.%s", 
_context.getClusterName(),
         _context.getResourceName(), _context.getTransition());

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index b385068..639fd8a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -22,6 +22,7 @@ package org.apache.helix.monitoring.mbeans;
 import org.apache.helix.HelixException;
 
 import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,8 +32,7 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
   public static final String MONITOR_KEY = "Key";
 
   private ObjectName _objectName;
-  private String _monitorType;
-  private String _monitorKey;
+  private String _sensorName;
 
   private long _stateChangeEventCounter;
   private long _dataChangeEventCounter;
@@ -40,31 +40,35 @@ public class ZkClientMonitor implements 
ZkClientMonitorMBean {
   private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> 
_zkClientPathMonitorMap =
       new ConcurrentHashMap<>();
 
-  public ZkClientMonitor(String monitorType, String monitorKey, boolean 
monitorRootPathOnly)
-      throws JMException {
+  public ZkClientMonitor(String monitorType, String monitorKey, String 
monitorInstanceName,
+      boolean monitorRootPathOnly) throws JMException {
     if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || 
monitorType
         .isEmpty()) {
       throw new HelixException("Cannot create ZkClientMonitor without monitor 
key and type.");
     }
 
-    _monitorType = monitorType;
-    _monitorKey = monitorKey;
-    regitster(monitorType, monitorKey);
+    _sensorName =
+        String.format("%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), 
monitorType, monitorKey);
+
+    _objectName =
+        MBeanRegistrar.register(this, getObjectName(monitorType, monitorKey, 
monitorInstanceName));
 
     for (ZkClientPathMonitor.PredefinedPath path : 
ZkClientPathMonitor.PredefinedPath.values()) {
       // If monitor root path only, check if the current path is Root.
       // Otherwise, add monitors for every path.
       if (!monitorRootPathOnly || 
path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
-        _zkClientPathMonitorMap
-            .put(path, new ZkClientPathMonitor(path.name(), monitorType, 
monitorKey));
+        _zkClientPathMonitorMap.put(path,
+            new ZkClientPathMonitor(path, monitorType, monitorKey, 
monitorInstanceName).register());
       }
     }
   }
 
-  private void regitster(String monitorType, String monitorKey) throws 
JMException {
-    _objectName = MBeanRegistrar
-        .register(this, MonitorDomainNames.HelixZkClient.name(), MONITOR_TYPE, 
monitorType,
-            MONITOR_KEY, monitorKey);
+  protected static ObjectName getObjectName(String monitorType, String 
monitorKey,
+      String monitorInstanceName) throws MalformedObjectNameException {
+    return MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
MONITOR_TYPE, monitorType,
+            MONITOR_KEY,
+            (monitorKey + (monitorInstanceName == null ? "" : "." + 
monitorInstanceName)));
   }
 
   /**
@@ -79,8 +83,7 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return String
-        .format("%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), 
_monitorType, _monitorKey);
+    return _sensorName;
   }
 
   public void increaseStateChangeEventCounter() {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index 08eb2bc..ccbbed9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -25,14 +25,19 @@ import 
org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.util.ArrayList;
 import java.util.List;
 
 public class ZkClientPathMonitor extends DynamicMBeanProvider {
   public static final String MONITOR_PATH = "PATH";
-
   private static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client 
Monitor";
   private final String _sensorName;
+  private final String _type;
+  private final String _key;
+  private final String _instanceName;
+  private final PredefinedPath _path;
 
   protected enum PredefinedPath {
     IdealStates(".*/IDEALSTATES/.*"),
@@ -87,12 +92,18 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
     return _sensorName;
   }
 
-  public ZkClientPathMonitor(String path, String monitorType, String 
monitorKey)
-      throws JMException {
+  public ZkClientPathMonitor(PredefinedPath path, String monitorType, String 
monitorKey,
+      String monitorInstanceName) {
+    _type = monitorType;
+    _key = monitorKey;
+    _instanceName = monitorInstanceName;
+    _path = path;
     _sensorName = String
         .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), 
monitorType, monitorKey,
-            path);
+            path.name());
+  }
 
+  public ZkClientPathMonitor register() throws JMException {
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_readCounter);
     attributeList.add(_writeCounter);
@@ -106,9 +117,13 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
     attributeList.add(_writeLatencyGauge);
     attributeList.add(_readBytesGauge);
     attributeList.add(_writeBytesGauge);
-    register(attributeList, MBEAN_DESCRIPTION, 
MonitorDomainNames.HelixZkClient.name(),
-        ZkClientMonitor.MONITOR_TYPE, monitorType, 
ZkClientMonitor.MONITOR_KEY, monitorKey,
-        MONITOR_PATH, path);
+
+    ObjectName objectName = new ObjectName(String.format("%s,%s=%s",
+        ZkClientMonitor.getObjectName(_type, _key, _instanceName).toString(),
+        MONITOR_PATH, _path.name()));
+    doRegister(attributeList, MBEAN_DESCRIPTION, objectName);
+
+    return this;
   }
 
   protected void record(int bytes, long latencyMilliSec, boolean isFailure, 
boolean isRead) {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index a2d2662..346503b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -53,7 +53,7 @@ public abstract class DynamicMBeanProvider implements 
DynamicMBean, SensorNamePr
    * @param domain         the MBean domain name
    * @param keyValuePairs  the MBean object name components
    */
-  protected synchronized void register(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
       String description, String domain, String... keyValuePairs) throws 
JMException {
     if (_objectName != null) {
       throw new HelixException(
@@ -70,7 +70,7 @@ public abstract class DynamicMBeanProvider implements 
DynamicMBean, SensorNamePr
    * @param description    the MBean description
    * @param objectName     the proposed MBean ObjectName
    */
-  protected synchronized void register(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
       String description, ObjectName objectName) throws JMException {
     if (_objectName != null) {
       throw new HelixException(
@@ -80,22 +80,9 @@ public abstract class DynamicMBeanProvider implements 
DynamicMBean, SensorNamePr
     _objectName = MBeanRegistrar.register(this, objectName);
   }
 
-  protected synchronized void register(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
+  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> 
dynamicMetrics,
       ObjectName objectName) throws JMException {
-    register(dynamicMetrics, null, objectName);
-  }
-
-  /**
-   * After unregistered, the MBean can't be registered again, a new monitor 
has be to created.
-   */
-  public synchronized void unregister() {
-    _metricRegistry.removeMatching(new MetricFilter() {
-      @Override
-      public boolean matches(String name, Metric metric) {
-        return name.startsWith(getMetricRegistryNamePrefix());
-      }
-    });
-    MBeanRegistrar.unregister(_objectName);
+    doRegister(dynamicMetrics, null, objectName);
   }
 
   protected String getMetricRegistryNamePrefix() {
@@ -147,6 +134,24 @@ public abstract class DynamicMBeanProvider implements 
DynamicMBean, SensorNamePr
         new MBeanNotificationInfo[0]);
   }
 
+  /**
+   * Call doRegister() to finish registration MBean and the attributes.
+   */
+  public abstract DynamicMBeanProvider register() throws JMException;
+
+  /**
+   * After unregistered, the MBean can't be registered again, a new monitor 
has be to created.
+   */
+  public synchronized void unregister() {
+    _metricRegistry.removeMatching(new MetricFilter() {
+      @Override
+      public boolean matches(String name, Metric metric) {
+        return name.startsWith(getMetricRegistryNamePrefix());
+      }
+    });
+    MBeanRegistrar.unregister(_objectName);
+  }
+
   @Override
   public Object getAttribute(String attribute)
       throws AttributeNotFoundException, MBeanException, ReflectionException {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java 
b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
index b5f7075..0e91314 100644
--- 
a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
+++ 
b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
@@ -26,7 +26,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 
 public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T> {
-  public static final String MONITOR_TAG = "HelixPropertyStore";
+  public static final String MONITOR_TYPE = "HelixPropertyStore";
 
   public ZkHelixPropertyStore(ZkBaseDataAccessor<T> accessor, String root,
       List<String> subscribedPaths) {
@@ -35,12 +35,10 @@ public class ZkHelixPropertyStore<T> extends 
ZkCacheBaseDataAccessor<T> {
 
   public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, 
String chrootPath,
       List<String> zkCachePaths) {
-    super(zkAddress, serializer, chrootPath, null, zkCachePaths,
-        MONITOR_TAG, chrootPath);
+    super(zkAddress, serializer, chrootPath, null, zkCachePaths, MONITOR_TYPE, 
chrootPath);
   }
 
   public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, 
String chrootPath) {
-    super(zkAddress, serializer, chrootPath, null, null,
-        MONITOR_TAG, chrootPath);
+    super(zkAddress, serializer, chrootPath, null, null, MONITOR_TYPE, 
chrootPath);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index fa414d7..cce8f99 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -79,6 +79,9 @@ public class TaskStateModelFactory extends 
StateModelFactory<TaskStateModel> {
   }
 
   public void shutdown() {
+    if (_monitor != null) {
+      _monitor.unregister();
+    }
     _taskExecutor.shutdown();
     _timerTaskExecutor.shutdown();
     if (_monitor != null ) {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
index 5e64271..eb4f94b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
@@ -52,7 +52,7 @@ public class TestClusterEventStatusMonitor {
 
     MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
     Set<ObjectInstance> mbeans =
-        _server.queryMBeans(new ObjectName("ClusterStatus" + 
":Cluster=TestCluster,eventName=ClusterEvent,*"), null);
+        _server.queryMBeans(new 
ObjectName("ClusterStatus:Cluster=TestCluster,eventName=ClusterEvent,*"), null);
     Assert.assertEquals(mbeans.size(), 0);
 
     int count = 5;
@@ -69,7 +69,7 @@ public class TestClusterEventStatusMonitor {
 
     mbeans =
         _server.queryMBeans(
-            new ObjectName("ClusterStatus: 
cluster=TestCluster,eventName=ClusterEvent,*"), null);
+            new 
ObjectName("ClusterStatus:cluster=TestCluster,eventName=ClusterEvent,*"), null);
     Assert.assertEquals(mbeans.size(), 6);
 
     for (ObjectInstance mbean : mbeans) {
@@ -86,7 +86,7 @@ public class TestClusterEventStatusMonitor {
 
     mbeans =
         _server.queryMBeans(
-            new ObjectName("ClusterStatus: 
cluster=TestCluster,eventName=ClusterEvent,*"), null);
+            new 
ObjectName("ClusterStatus:cluster=TestCluster,eventName=ClusterEvent,*"), null);
     Assert.assertEquals(mbeans.size(), 0);
 
     System.out.println("END TestParticipantMonitor");

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index a01cd36..da8dce1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -33,9 +33,7 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 
-import org.apache.helix.monitoring.ParticipantStatusMonitor;
-import org.apache.helix.monitoring.StateTransitionContext;
-import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
 import org.apache.log4j.Logger;
 import org.testng.AssertJUnit;

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
index 36921e1..8c05626 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -104,6 +104,6 @@ public class TestDisableResourceMbean extends 
ZkUnitTestBase {
     String resourceBeanName = String
         .format("%s,%s=%s", clusterBeanName, 
ClusterStatusMonitor.RESOURCE_DN_KEY, resourceName);
     return new ObjectName(
-        String.format("%s: %s", MonitorDomainNames.ClusterStatus.name(), 
resourceBeanName));
+        String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), 
resourceBeanName));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
index 219a2db..2529360 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
@@ -108,7 +108,7 @@ public class TestDropResourceMetricsReset extends 
ZkUnitTestBase {
     String resourceBeanName =
         String.format("%s,%s=%s", clusterBeanName, 
ClusterStatusMonitor.RESOURCE_DN_KEY,
             resourceName);
-    return new ObjectName(String.format("%s: %s", 
MonitorDomainNames.ClusterStatus.name(),
+    return new ObjectName(String.format("%s:%s", 
MonitorDomainNames.ClusterStatus.name(),
         resourceBeanName));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
index e183052..8758205 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestHelixCallbackMonitor.java
@@ -43,19 +43,19 @@ public class TestHelixCallbackMonitor {
   public void testMBeanRegisteration() throws JMException {
     Set<HelixCallbackMonitor> monitors = new HashSet<>();
     for (HelixConstants.ChangeType changeType : 
HelixConstants.ChangeType.values()) {
-      monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, 
changeType));
+      monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, null, 
changeType).register());
       Assert.assertTrue(
           _beanServer.isRegistered(buildObjectName(TEST_TYPE, TEST_CLUSTER, 
changeType)));
     }
 
     for (HelixConstants.ChangeType changeType : 
HelixConstants.ChangeType.values()) {
-      monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, 
changeType));
+      monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, null, 
changeType).register());
       Assert.assertTrue(
           _beanServer.isRegistered(buildObjectName(TEST_TYPE, TEST_CLUSTER, 
changeType, 1)));
     }
 
     for (HelixConstants.ChangeType changeType : 
HelixConstants.ChangeType.values()) {
-      monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, 
changeType));
+      monitors.add(new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, null, 
changeType).register());
       Assert.assertTrue(
           _beanServer.isRegistered(buildObjectName(TEST_TYPE, TEST_CLUSTER, 
changeType, 2)));
     }
@@ -77,8 +77,9 @@ public class TestHelixCallbackMonitor {
 
   @Test
   public void testCounter() throws JMException {
-    HelixCallbackMonitor monitor =
-        new HelixCallbackMonitor(TEST_TYPE, TEST_CLUSTER, 
HelixConstants.ChangeType.CURRENT_STATE);
+    HelixCallbackMonitor monitor = new HelixCallbackMonitor(TEST_TYPE, 
TEST_CLUSTER, null,
+        HelixConstants.ChangeType.CURRENT_STATE);
+    monitor.register();
     ObjectName name =
         buildObjectName(TEST_TYPE, TEST_CLUSTER, 
HelixConstants.ChangeType.CURRENT_STATE);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
index 8b1ec26..409a10a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
@@ -102,6 +102,6 @@ public class TestResetClusterMetrics extends ZkUnitTestBase 
{
   }
 
   private ObjectName objectName(String beanName) throws Exception {
-    return new ObjectName(MonitorDomainNames.ClusterStatus.name() + ": " + 
beanName);
+    return new ObjectName(MonitorDomainNames.ClusterStatus.name() + ":" + 
beanName);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 8b1ebe1..5c52195 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -47,6 +47,7 @@ public class TestResourceMonitor {
   @Test() public void testReportData() throws JMException {
     final int n = 5;
     ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName, new 
ObjectName("testDomain:key=value"));
+    monitor.register();
 
     List<String> instances = new ArrayList<String>();
     for (int i = 0; i < n; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index c5ee212..8bf136e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -32,15 +32,13 @@ public class TestZkClientMonitor {
 
   private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();
 
-  private ObjectName buildObjectName(String tag, String key) throws 
MalformedObjectNameException {
-    return MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE, tag,
-            ZkClientMonitor.MONITOR_KEY, key);
+  private ObjectName buildObjectName(String tag, String key, String instance) 
throws MalformedObjectNameException {
+    return ZkClientMonitor.getObjectName(tag, key, instance);
   }
 
-  private ObjectName buildObjectName(String tag, String key, int num)
+  private ObjectName buildObjectName(String tag, String key, String instance, 
int num)
       throws MalformedObjectNameException {
-    ObjectName objectName = buildObjectName(tag, key);
+    ObjectName objectName = buildObjectName(tag, key, instance);
     if (num > 0) {
       return new ObjectName(String
           .format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE,
@@ -50,11 +48,10 @@ public class TestZkClientMonitor {
     }
   }
 
-  private ObjectName buildPathMonitorObjectName(String tag, String key, String 
path)
+  private ObjectName buildPathMonitorObjectName(String tag, String key, String 
instance, String path)
       throws MalformedObjectNameException {
-    return MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE, tag,
-            ZkClientMonitor.MONITOR_KEY, key, 
ZkClientPathMonitor.MONITOR_PATH, path);
+    return new ObjectName(String
+        .format("%s,%s=%s", buildObjectName(tag, key, instance).toString(), 
ZkClientPathMonitor.MONITOR_PATH, path));
   }
 
   @Test
@@ -62,37 +59,39 @@ public class TestZkClientMonitor {
     final String TEST_TAG_1 = "test_tag_1";
     final String TEST_KEY_1 = "test_key_1";
 
-    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, 
true);
-    Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1)));
+    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, 
null, true);
+    Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1, null)));
 
     // no per-path monitor items created since "monitorRootPathOnly" = true
-    
Assert.assertFalse(_beanServer.isRegistered(buildPathMonitorObjectName(TEST_TAG_1,
 TEST_KEY_1,
-        ZkClientPathMonitor.PredefinedPath.IdealStates.name())));
+    Assert.assertFalse(_beanServer.isRegistered(
+        buildPathMonitorObjectName(TEST_TAG_1, TEST_KEY_1, null,
+            ZkClientPathMonitor.PredefinedPath.IdealStates.name())));
 
-    ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, 
TEST_KEY_1, true);
-    Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1, 1)));
+    ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, 
TEST_KEY_1, null, true);
+    Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1, null, 1)));
 
     monitor.unregister();
     monitorDuplicate.unregister();
 
-    Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1)));
-    Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1, 1)));
+    Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1, null)));
+    Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1, null, 1)));
   }
 
   @Test
   public void testCounter() throws JMException {
     final String TEST_TAG = "test_tag_3";
     final String TEST_KEY = "test_key_3";
+    final String TEST_INSTANCE = "test_instance_3";
 
-    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, false);
-    ObjectName name = buildObjectName(TEST_TAG, TEST_KEY);
+    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, 
TEST_INSTANCE, false);
+    ObjectName name = buildObjectName(TEST_TAG, TEST_KEY, TEST_INSTANCE);
     ObjectName rootName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY,
-        ZkClientPathMonitor.PredefinedPath.Root.name());
-    ObjectName idealStateName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY,
+        TEST_INSTANCE, ZkClientPathMonitor.PredefinedPath.Root.name());
+    ObjectName idealStateName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, 
TEST_INSTANCE,
         ZkClientPathMonitor.PredefinedPath.IdealStates.name());
-    ObjectName instancesName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY,
+    ObjectName instancesName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, 
TEST_INSTANCE,
         ZkClientPathMonitor.PredefinedPath.Instances.name());
-    ObjectName currentStateName = buildPathMonitorObjectName(TEST_TAG, 
TEST_KEY,
+    ObjectName currentStateName = buildPathMonitorObjectName(TEST_TAG, 
TEST_KEY, TEST_INSTANCE,
         ZkClientPathMonitor.PredefinedPath.CurrentStates.name());
 
     monitor.increaseDataChangeEventCounter();

http://git-wip-us.apache.org/repos/asf/helix/blob/ead6a729/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
 
b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
index 8a3ce54..4abada2 100644
--- 
a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkHelixPropertyStore.java
@@ -385,7 +385,7 @@ public class TestZkHelixPropertyStore extends 
ZkUnitTestBase {
 
     ObjectName name = MBeanRegistrar
         .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
-            ZkHelixPropertyStore.MONITOR_TAG, ZkClientMonitor.MONITOR_KEY, 
TEST_ROOT,
+            ZkHelixPropertyStore.MONITOR_TYPE, ZkClientMonitor.MONITOR_KEY, 
TEST_ROOT,
             ZkClientPathMonitor.MONITOR_PATH, "Root");
     MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
     Assert.assertTrue(beanServer.isRegistered(name));

Reply via email to