Refactor ZkClient methods to simplify monitoring configures when create a 
client.

Additional change:
1. Refine monitor class to reduce unnecessary MBean attributes for ZkClient.
2. Apply new monitor framework to MessageLatencyMonitor.
3. Fix a MBeanRegistrar bug, MBean should be registered according to the 
specified path in order.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2b569db8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2b569db8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2b569db8

Branch: refs/heads/master
Commit: 2b569db89fc6f1da1199fb5e981ad08b379ff787
Parents: c7b250a
Author: Jiajun Wang <jjw...@linkedin.com>
Authored: Mon Aug 28 14:46:15 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:07:09 2017 -0800

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |  10 +-
 .../helix/manager/zk/ZkAsyncCallbacks.java      |  13 +-
 .../manager/zk/ZkCacheBaseDataAccessor.java     |   9 +-
 .../org/apache/helix/manager/zk/ZkClient.java   | 124 ++++++++++++++++---
 .../monitoring/ParticipantStatusMonitor.java    |   6 +-
 .../helix/monitoring/mbeans/MBeanRegistrar.java |  10 +-
 .../mbeans/MessageLatencyMonitor.java           |  98 +++++++++------
 .../monitoring/mbeans/ZkClientMonitor.java      |  21 +++-
 .../monitoring/mbeans/ZkClientPathMonitor.java  |  30 +----
 .../dynamicMBeans/HistogramDynamicMetric.java   |   4 -
 .../apache/helix/manager/zk/TestZkClient.java   |  21 ++--
 .../monitoring/mbeans/TestZkClientMonitor.java  |  16 +--
 12 files changed, 231 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 82c3224..d048ad1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -599,8 +599,14 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     PathBasedZkSerializer zkSerializer =
         ChainedPathZkSerializer.builder(new 
ZNRecordStreamingSerializer()).build();
 
-    _zkclient = new ZkClient(_zkAddress, _sessionTimeout, 
_clientConnectionTimeout, zkSerializer,
-        _instanceType.name(), String.format("%s.%s", _clusterName, 
_instanceName));
+    ZkClient.Builder zkClientBuilder = new ZkClient.Builder();
+    zkClientBuilder.setZkServer(_zkAddress).setSessionTimeout(_sessionTimeout)
+        
.setConnectionTimeout(_clientConnectionTimeout).setZkSerializer(zkSerializer)
+        .setMonitorType(_instanceType.name())
+        .setMonitorKey(String.format("%s.%s", _clusterName, _instanceName))
+        .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) 
&& !_instanceType
+            .equals(InstanceType.CONTROLLER_PARTICIPANT));
+    _zkclient = zkClientBuilder.build();
 
     _baseDataAccessor = createBaseDataAccessor();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
index 2be1244..1695ced 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
@@ -133,10 +133,12 @@ public class ZkAsyncCallbacks {
 
       if (ctx != null && ctx instanceof ZkAsyncCallContext) {
         ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
-        if (zkCtx._isRead) {
-          zkCtx._monitor.recordRead(path, zkCtx._bytes, 
zkCtx._startTimeMilliSec);
-        } else {
-          zkCtx._monitor.recordWrite(path, zkCtx._bytes, 
zkCtx._startTimeMilliSec);
+        if (zkCtx._monitor != null) {
+          if (zkCtx._isRead) {
+            zkCtx._monitor.recordRead(path, zkCtx._bytes, 
zkCtx._startTimeMilliSec);
+          } else {
+            zkCtx._monitor.recordWrite(path, zkCtx._bytes, 
zkCtx._startTimeMilliSec);
+          }
         }
       }
 
@@ -177,9 +179,6 @@ public class ZkAsyncCallbacks {
 
     public ZkAsyncCallContext(final ZkClientMonitor monitor, long 
startTimeMilliSec, int bytes,
         boolean isRead) {
-      if (monitor == null) {
-        throw new NullPointerException("ZkClientMonitor must not be null.");
-      }
       _monitor = monitor;
       _startTimeMilliSec = startTimeMilliSec;
       _bytes = bytes;

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 322c415..cb43ea9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -109,9 +109,12 @@ public class ZkCacheBaseDataAccessor<T> implements 
HelixPropertyStore<T> {
 
   public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, 
String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths, String 
monitorType, String monitorkey) {
-    _zkclient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
-        ZkClient.DEFAULT_CONNECTION_TIMEOUT, new 
BasicZkSerializer(serializer), monitorType,
-        monitorkey);
+    ZkClient.Builder zkClientBuilder = new ZkClient.Builder();
+    
zkClientBuilder.setZkServer(zkAddress).setSessionTimeout(ZkClient.DEFAULT_SESSION_TIMEOUT)
+        
.setConnectionTimeout(ZkClient.DEFAULT_CONNECTION_TIMEOUT).setZkSerializer(serializer)
+        .setMonitorType(monitorType).setMonitorKey(monitorkey);
+    _zkclient = zkClientBuilder.build();
+
     _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, 
TimeUnit.MILLISECONDS);
     _baseAccessor = new ZkBaseDataAccessor<>(_zkclient);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index a65a75a..7ec693f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -42,7 +42,6 @@ import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 import javax.management.JMException;
@@ -63,17 +62,23 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   private PathBasedZkSerializer _zkSerializer;
   private ZkClientMonitor _monitor;
 
+  private ZkClient(IZkConnection connection, int connectionTimeout, long 
operationRetryTimeout,
+      PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey,
+      boolean monitorRootPathOnly) {
+    super(connection, connectionTimeout, new ByteArraySerializer(), 
operationRetryTimeout);
+    init(zkSerializer, monitorType, monitorKey, monitorRootPathOnly);
+  }
+
   public ZkClient(IZkConnection connection, int connectionTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey,
       long operationRetryTimeout) {
-    super(connection, connectionTimeout, new ByteArraySerializer(), 
operationRetryTimeout);
-    init(zkSerializer, monitorType, monitorKey);
+    this(connection, connectionTimeout, operationRetryTimeout, zkSerializer, 
monitorType,
+        monitorKey, true);
   }
 
   public ZkClient(IZkConnection connection, int connectionTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey) {
-    super(connection, connectionTimeout, new ByteArraySerializer());
-    init(zkSerializer, monitorType, monitorKey);
+    this(connection, connectionTimeout, zkSerializer, monitorType, monitorKey, 
-1);
   }
 
   public ZkClient(String zkServers, String monitorType, String monitorKey) {
@@ -127,28 +132,20 @@ public class ZkClient extends 
org.I0Itec.zkclient.ZkClient {
     this(zkServers, null, null);
   }
 
-  protected void init(PathBasedZkSerializer zkSerializer, String monitorType, 
String monitorKey) {
+  protected void init(PathBasedZkSerializer zkSerializer, String monitorType, 
String monitorKey,
+      boolean monitorRootPathOnly) {
     _zkSerializer = zkSerializer;
     if (LOG.isTraceEnabled()) {
       StackTraceElement[] calls = Thread.currentThread().getStackTrace();
       LOG.trace("created a zkclient. callstack: " + Arrays.asList(calls));
     }
     try {
-      if (monitorKey == null) {
-        // Use ZK session Id as monitor key
-        monitorType = ZkClientMonitor.SESSION_ID_PROPERTY_NAME;
-        // _connection cannot be null, checked in org.I0Itec.zkclient.ZkClient 
constructor
-        ZooKeeper zk = ((ZkConnection) _connection).getZookeeper();
-        if (zk != null) {
-          monitorKey = new Long(zk.getSessionId()).toString();
-        } else { // if zkClient is not connected
-          LOG.error("Cannot creating ZkClientMonitor because ZkClient is not 
connected.");
-          return;
-        }
-      } else if (monitorType == null) {
-        monitorType = ZkClientMonitor.CUSTOMIZED_PROPERTY_NAME;
+      if (monitorKey != null && !monitorKey.isEmpty() &&
+          monitorType != null && !monitorType.isEmpty()) {
+        _monitor = new ZkClientMonitor(monitorType, monitorKey, 
monitorRootPathOnly);
+      } else {
+        LOG.info("ZkClient monitor key or type is not provided. Skip 
monitoring.");
       }
-      _monitor = new ZkClientMonitor(monitorType, monitorKey);
     } catch (JMException e) {
       LOG.error("Error in creating ZkClientMonitor", e);
     }
@@ -595,4 +592,91 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient 
{
       _monitor.recordWriteFailure(path);
     }
   }
+
+  public static class Builder {
+    IZkConnection _connection;
+    String _zkServer;
+    Integer _sessionTimeout;
+
+    PathBasedZkSerializer _zkSerializer;
+
+    long _operationRetryTimeout = -1L;
+    int _connectionTimeout = Integer.MAX_VALUE;
+
+    String _monitorType;
+    String _monitorKey;
+    boolean _monitorRootPathOnly = true;
+
+    public Builder setConnection(IZkConnection connection) {
+      this._connection = connection;
+      return this;
+    }
+
+    public Builder setConnectionTimeout(Integer connectionTimeout) {
+      this._connectionTimeout = connectionTimeout;
+      return this;
+    }
+
+    public Builder setZkSerializer(PathBasedZkSerializer zkSerializer) {
+      this._zkSerializer = zkSerializer;
+      return this;
+    }
+
+    public Builder setZkSerializer(ZkSerializer zkSerializer) {
+      this._zkSerializer = new BasicZkSerializer(zkSerializer);
+      return this;
+    }
+
+    public Builder setMonitorType(String monitorType) {
+      this._monitorType = monitorType;
+      return this;
+    }
+
+    public Builder setMonitorKey(String monitorKey) {
+      this._monitorKey = monitorKey;
+      return this;
+    }
+
+    public Builder setMonitorRootPathOnly(Boolean monitorRootPathOnly) {
+      this._monitorRootPathOnly = monitorRootPathOnly;
+      return this;
+    }
+
+    public Builder setZkServer(String zkServer) {
+      this._zkServer = zkServer;
+      return this;
+    }
+
+    public Builder setSessionTimeout(Integer sessionTimeout) {
+      this._sessionTimeout = sessionTimeout;
+      return this;
+    }
+
+    public Builder setOperationRetryTimeout(Long operationRetryTimeout) {
+      this._operationRetryTimeout = operationRetryTimeout;
+      return this;
+    }
+
+    public ZkClient build() {
+      if (_connection == null) {
+        if (_zkServer == null) {
+          throw new HelixException(
+              "Failed to build ZkClient since no connection or ZK server 
address is specified.");
+        } else {
+          if (_sessionTimeout == null) {
+            _connection = new ZkConnection(_zkServer);
+          } else {
+            _connection = new ZkConnection(_zkServer, _sessionTimeout);
+          }
+        }
+      }
+
+      if (_zkSerializer == null) {
+        _zkSerializer = new BasicZkSerializer(new SerializableSerializer());
+      }
+
+      return new ZkClient(_connection, _connectionTimeout, 
_operationRetryTimeout, _zkSerializer,
+          _monitorType, _monitorKey, _monitorRootPathOnly);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
index ee1d630..6192c6d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
@@ -57,7 +57,7 @@ public class ParticipantStatusMonitor {
         _messageLatencyMonitor = new MessageLatencyMonitor(instanceName);
         _executorMonitors = new ConcurrentHashMap<>();
         register(_messageMonitor, 
getObjectName(_messageMonitor.getParticipantBeanName()));
-        register(_messageLatencyMonitor, 
getObjectName(_messageLatencyMonitor.getBeanName()));
+        
_messageLatencyMonitor.register(MonitorDomainNames.CLMParticipantReport.name());
       }
     } catch (Exception e) {
       LOG.warn(e);
@@ -152,6 +152,9 @@ public class ParticipantStatusMonitor {
         LOG.warn("fail to unregister " + 
_messageMonitor.getParticipantBeanName(), e);
       }
     }
+    if (_messageLatencyMonitor != null) {
+      _messageLatencyMonitor.unregister();
+    }
     for (StateTransitionContext cxt : _monitorMap.keySet()) {
       try {
         ObjectName name = getObjectName(cxt.toString());
@@ -163,7 +166,6 @@ public class ParticipantStatusMonitor {
       }
     }
     _monitorMap.clear();
-
   }
 
   public void createExecutorMonitor(String type, ExecutorService executor) {

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
index 05af7a7..6d43575 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java
@@ -20,7 +20,6 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import java.lang.management.ManagementFactory;
-import java.util.Hashtable;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.JMException;
 import javax.management.MBeanServer;
@@ -88,13 +87,14 @@ public class MBeanRegistrar {
           + "number of String and at least 2 String");
     }
 
-    Hashtable<String, String> table = new Hashtable<>();
+    StringBuilder objectNameStr = new StringBuilder();
     for (int i = 0; i < keyValuePairs.length; i += 2) {
-      table.put(keyValuePairs[i], keyValuePairs[i + 1]);
+      objectNameStr.append(
+          String.format(i == 0 ? "%s=%s" : ",%s=%s", keyValuePairs[i], 
keyValuePairs[i + 1]));
     }
     if (num > 0) {
-      table.put(DUPLICATE, String.valueOf(num));
+      objectNameStr.append(String.format(",%s=%s", DUPLICATE, 
String.valueOf(num)));
     }
-    return new ObjectName(domain, table);
+    return new ObjectName(String.format("%s:%s", domain, 
objectNameStr.toString()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
index 4096be1..35fe8b8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
@@ -19,66 +19,82 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.helix.HelixException;
 import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.ParticipantStatusMonitor;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 import org.apache.log4j.Logger;
 
-public class MessageLatencyMonitor implements MessageLatencyMonitorMBean {
+import javax.management.JMException;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessageLatencyMonitor {
   private static final Logger logger = 
Logger.getLogger(MessageLatencyMonitor.class.getName());
+  private static final String MBEAN_DESCRIPTION = "Helix Message Latency 
Monitor";
   public static String MONITOR_TYPE_KW = "MonitorType";
-  private static long DEFAULT_RESET_TIME = 60 * 60 * 1000;
-  private long _totalMessageLatency;
-  private long _totalMessageCount;
-  private long _maxSingleMessageLatency;
-  private long _lastResetTime;
+
+  private static final MetricRegistry _metricRegistry = new MetricRegistry();
   private String _participantName;
+  private ObjectName _objectName;
+
+  private DynamicMBeanProvider _dynamicMBeanProvider;
+
+  private SimpleDynamicMetric<Long> _totalMessageCount =
+      new SimpleDynamicMetric("TotalMessageCount", 0l);
+  private SimpleDynamicMetric<Long> _totalMessageLatency =
+      new SimpleDynamicMetric("TotalMessageLatency", 0l);
+  private HistogramDynamicMetric _messageLatencyGauge =
+      new HistogramDynamicMetric("MessageLatencyGauge",
+          _metricRegistry.histogram(toString() + "MessageLatencyGauge"));
 
   public MessageLatencyMonitor(String participantName) {
-    _totalMessageLatency = 0L;
-    _totalMessageCount = 0L;
-    _maxSingleMessageLatency = 0;
-    _lastResetTime = System.currentTimeMillis();
-    _participantName = participantName;
-  }
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_totalMessageCount);
+    attributeList.add(_totalMessageLatency);
+    attributeList.add(_messageLatencyGauge);
 
-  public String getBeanName() {
-    return String.format("%s=%s,%s=%s", 
ParticipantStatusMonitor.PARTICIPANT_KEY, _participantName,
-        MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName());
+    _participantName = participantName;
+    _dynamicMBeanProvider = new DynamicMBeanProvider(String
+        .format("%s.%s.%s.%s", 
ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, participantName,
+            MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()), 
MBEAN_DESCRIPTION,
+        attributeList);
   }
 
   public void updateLatency(Message message) {
     long latency = System.currentTimeMillis() - message.getCreateTimeStamp();
     logger.info(String.format("The latency of message %s is %d ms", 
message.getMsgId(), latency));
-    _totalMessageCount++;
-    _totalMessageLatency += latency;
 
-    if (_lastResetTime + DEFAULT_RESET_TIME <= System.currentTimeMillis() ||
-        latency > _maxSingleMessageLatency) {
-      _maxSingleMessageLatency = latency;
-      _lastResetTime = System.currentTimeMillis();
-    }
+    _totalMessageCount.updateValue(_totalMessageCount.getValue() + 1);
+    _totalMessageLatency.updateValue(_totalMessageLatency.getValue() + 
latency);
+    _messageLatencyGauge.updateValue(latency);
   }
 
-
-  @Override
-  public long getTotalMessageLatencyCounter() {
-    return _totalMessageLatency;
-  }
-
-  @Override
-  public long getTotalMessageCounter() {
-    return _totalMessageCount;
-  }
-
-  @Override
-  public long getMaxSingleMessageLatencyGauge() {
-    return _maxSingleMessageLatency;
+  public void register(String domainName) throws JMException {
+    if (_objectName != null) {
+      throw new HelixException("Monitor has already been registed: " + 
_objectName.toString());
+    }
+    _objectName = MBeanRegistrar
+        .register(_dynamicMBeanProvider, domainName, 
ParticipantStatusMonitor.PARTICIPANT_KEY,
+            _participantName, MONITOR_TYPE_KW, 
MessageLatencyMonitor.class.getSimpleName());
   }
 
-  @Override
-  public String getSensorName() {
-    return String
-        .format("%s.%s.%s.%s.", 
ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, _participantName,
-            MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName());
+  public void unregister() {
+    if (_objectName != null) {
+      MBeanRegistrar.unregister(_objectName);
+    }
+    _metricRegistry.removeMatching(new MetricFilter() {
+      @Override
+      public boolean matches(String name, Metric metric) {
+        return name.startsWith(toString());
+      }
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index b685c45..b385068 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -19,6 +19,8 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+
 import javax.management.JMException;
 import javax.management.ObjectName;
 import java.util.Map;
@@ -28,9 +30,6 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
   public static final String MONITOR_TYPE = "Type";
   public static final String MONITOR_KEY = "Key";
 
-  public static final String SESSION_ID_PROPERTY_NAME = "SessionId";
-  public static final String CUSTOMIZED_PROPERTY_NAME = "CustomizedKey";
-
   private ObjectName _objectName;
   private String _monitorType;
   private String _monitorKey;
@@ -41,14 +40,24 @@ public class ZkClientMonitor implements 
ZkClientMonitorMBean {
   private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> 
_zkClientPathMonitorMap =
       new ConcurrentHashMap<>();
 
-  public ZkClientMonitor(String monitorType, String monitorKey) throws 
JMException {
+  public ZkClientMonitor(String monitorType, String monitorKey, boolean 
monitorRootPathOnly)
+      throws JMException {
+    if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || 
monitorType
+        .isEmpty()) {
+      throw new HelixException("Cannot create ZkClientMonitor without monitor 
key and type.");
+    }
+
     _monitorType = monitorType;
     _monitorKey = monitorKey;
     regitster(monitorType, monitorKey);
 
     for (ZkClientPathMonitor.PredefinedPath path : 
ZkClientPathMonitor.PredefinedPath.values()) {
-      _zkClientPathMonitorMap
-          .put(path, new ZkClientPathMonitor(path.name(), monitorType, 
monitorKey));
+      // If monitor root path only, check if the current path is Root.
+      // Otherwise, add monitors for every path.
+      if (!monitorRootPathOnly || 
path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
+        _zkClientPathMonitorMap
+            .put(path, new ZkClientPathMonitor(path.name(), monitorType, 
monitorKey));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index dac0cbd..d10f66c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -26,6 +26,7 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.helix.HelixException;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.*;
 
 import java.util.ArrayList;
@@ -34,7 +35,6 @@ import java.util.List;
 
 public class ZkClientPathMonitor {
   public static final String MONITOR_PATH = "PATH";
-  private static final long RESET_INTERVAL = 1000 * 60 * 10; // 1 hour
   private static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client 
Monitor";
 
   private static final MetricRegistry _metricRegistry = new MetricRegistry();
@@ -68,8 +68,6 @@ public class ZkClientPathMonitor {
     }
   }
 
-  private long _lastResetTime = 0;
-
   private SimpleDynamicMetric<Long> _readCounter = new 
SimpleDynamicMetric("ReadCounter", 0l);
   private SimpleDynamicMetric<Long> _writeCounter = new 
SimpleDynamicMetric("WriteCounter", 0l);
   private SimpleDynamicMetric<Long> _readBytesCounter =
@@ -84,10 +82,6 @@ public class ZkClientPathMonitor {
       new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l);
   private SimpleDynamicMetric<Long> _writeTotalLatencyCounter =
       new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l);
-  private SimpleDynamicMetric<Long> _readMaxLatencyGauge =
-      new SimpleDynamicMetric("ReadMaxLatencyGauge", 0l);
-  private SimpleDynamicMetric<Long> _writeMaxLatencyGauge =
-      new SimpleDynamicMetric("WriteMaxLatencyGauge", 0l);
 
   private HistogramDynamicMetric _readLatencyGauge = new 
HistogramDynamicMetric("ReadLatencyGauge",
       _metricRegistry.histogram(toString() + "ReadLatencyGauge"));
@@ -101,6 +95,11 @@ public class ZkClientPathMonitor {
 
   protected ZkClientPathMonitor(String path, String monitorType, String 
monitorKey)
       throws JMException {
+    if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || 
monitorType
+        .isEmpty()) {
+      throw new HelixException("Cannot create ZkClientMonitor without monitor 
key and type.");
+    }
+
     _monitorType = monitorType;
     _monitorKey = monitorKey;
     _path = path;
@@ -114,8 +113,6 @@ public class ZkClientPathMonitor {
     attributeList.add(_writeFailureCounter);
     attributeList.add(_readTotalLatencyCounter);
     attributeList.add(_writeTotalLatencyCounter);
-    attributeList.add(_readMaxLatencyGauge);
-    attributeList.add(_writeMaxLatencyGauge);
     attributeList.add(_readLatencyGauge);
     attributeList.add(_writeLatencyGauge);
     attributeList.add(_readBytesGauge);
@@ -154,13 +151,6 @@ public class ZkClientPathMonitor {
     } else {
       increaseCounter(isRead);
       increaseTotalLatency(isRead, latencyMilliSec);
-      if (_lastResetTime + RESET_INTERVAL <= System.currentTimeMillis()
-          || latencyMilliSec > (isRead ?
-          _readMaxLatencyGauge.getValue() :
-          _writeMaxLatencyGauge.getValue())) {
-        setMaxLatency(isRead, latencyMilliSec);
-        _lastResetTime = System.currentTimeMillis();
-      }
       if (bytes > 0) {
         increaseBytesCounter(isRead, bytes);
       }
@@ -202,12 +192,4 @@ public class ZkClientPathMonitor {
       _writeLatencyGauge.updateValue(latencyDelta);
     }
   }
-
-  private void setMaxLatency(boolean isRead, long latency) {
-    if (isRead) {
-      _readMaxLatencyGauge.updateValue(latency);
-    } else {
-      _writeMaxLatencyGauge.updateValue(latency);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java
index bf447a1..bd6e6c9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java
@@ -41,15 +41,11 @@ public class HistogramDynamicMetric extends 
DynamicMetric<Histogram, Long> {
    * The enum statistic attributes
    */
   enum SnapshotAttribute {
-    Median("getMedian", "Median"),
     Pct75th("get75thPercentile", "75Pct"),
     Pct95th("get95thPercentile", "95Pct"),
-    Pct98th("get98thPercentile", "98ct"),
     Pct99th("get99thPercentile", "99Pct"),
-    Pct999th("get999thPercentile", "999Pct"),
     Max("getMax", "Max"),
     Mean("getMean", "Mean"),
-    Min("getMin", "Min"),
     StdDev("getStdDev", "StdDev");
 
     final String _getMethodName;

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
index 7b2ef6d..c8f8d69 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
@@ -144,7 +144,10 @@ public class TestZkClient extends ZkUnitTestBase {
     final String TEST_NODE = "/test_zkclient_monitor";
     final String TEST_PATH = TEST_ROOT + TEST_NODE;
 
-    ZkClient zkClient = new ZkClient(ZK_ADDR, TEST_TAG, TEST_KEY);
+    ZkClient.Builder builder = new ZkClient.Builder();
+    
builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
+        .setMonitorRootPathOnly(false);
+    ZkClient zkClient = builder.build();
 
     final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, 
TEST_PATH).length;
 
@@ -174,11 +177,11 @@ public class TestZkClient extends ZkUnitTestBase {
     // Test exists
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadMaxLatencyGauge"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadLatencyGauge.Max"), 0);
     zkClient.exists(TEST_ROOT);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 1);
     Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter") >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadMaxLatencyGauge") >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadLatencyGauge.Max") >= 0);
 
     // Test create
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 0);
@@ -186,10 +189,10 @@ public class TestZkClient extends ZkUnitTestBase {
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteBytesCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteTotalLatencyCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteMaxLatencyGauge"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteLatencyGauge.Max"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter"),
         0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteMaxLatencyGauge"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteLatencyGauge.Max"), 0);
     zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteBytesCounter"),
@@ -200,11 +203,11 @@ public class TestZkClient extends ZkUnitTestBase {
     long origWriteTotalLatencyCounter =
         (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
     Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"WriteMaxLatencyGauge") >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"WriteLatencyGauge.Max") >= 0);
     long origIdealStatesWriteTotalLatencyCounter =
         (long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter");
     Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"WriteMaxLatencyGauge") >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"WriteLatencyGauge.Max") >= 0);
 
     // Test read
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 1);
@@ -216,7 +219,7 @@ public class TestZkClient extends ZkUnitTestBase {
     long origIdealStatesReadTotalLatencyCounter =
         (long) beanServer.getAttribute(idealStatename, 
"ReadTotalLatencyCounter");
     Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadMaxLatencyGauge"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max"), 0);
     zkClient.readData(TEST_PATH, new Stat());
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 2);
     Assert
@@ -228,7 +231,7 @@ public class TestZkClient extends ZkUnitTestBase {
         >= origReadTotalLatencyCounter);
     Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadTotalLatencyCounter")
         >= origIdealStatesReadTotalLatencyCounter);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadMaxLatencyGauge") >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max") >= 0);
     zkClient.getChildren(TEST_PATH);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 3);
     Assert

http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 9fff64d..e227b86 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -57,9 +57,14 @@ public class TestZkClientMonitor {
     final String TEST_TAG_1 = "test_tag_1";
     final String TEST_KEY_1 = "test_key_1";
 
-    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1);
+    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, 
true);
     Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1)));
-    ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, 
TEST_KEY_1);
+
+    // no per-path monitor items created since "monitorRootPathOnly" = true
+    
Assert.assertFalse(_beanServer.isRegistered(buildPathMonitorObjectName(TEST_TAG_1,
 TEST_KEY_1,
+        ZkClientPathMonitor.PredefinedPath.IdealStates.name())));
+
+    ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, 
TEST_KEY_1, true);
     Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, 
TEST_KEY_1, 1)));
 
     monitor.unregister();
@@ -74,7 +79,7 @@ public class TestZkClientMonitor {
     final String TEST_TAG = "test_tag_3";
     final String TEST_KEY = "test_key_3";
 
-    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY);
+    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, false);
     ObjectName name = buildObjectName(TEST_TAG, TEST_KEY);
     ObjectName rootName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY,
         ZkClientPathMonitor.PredefinedPath.Root.name());
@@ -93,14 +98,11 @@ public class TestZkClientMonitor {
     Assert.assertEquals((long) _beanServer.getAttribute(rootName, 
"ReadCounter"), 1);
     Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, 
"ReadCounter"), 1);
     Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"ReadLatencyGauge.Max") >= 10);
-    Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"ReadMaxLatencyGauge") >= 10);
-
     monitor.recordRead("TEST/INSTANCES/testDB0", 0, System.currentTimeMillis() 
- 15);
     Assert.assertEquals((long) _beanServer.getAttribute(rootName, 
"ReadCounter"), 2);
     Assert.assertEquals((long) _beanServer.getAttribute(instancesName, 
"ReadCounter"), 1);
     Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, 
"ReadCounter"), 1);
     Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"ReadTotalLatencyCounter") >= 25);
-    Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"ReadMaxLatencyGauge") >= 15);
 
     
monitor.recordWrite("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5,
         System.currentTimeMillis() - 10);
@@ -110,11 +112,9 @@ public class TestZkClientMonitor {
     Assert.assertEquals((long) _beanServer.getAttribute(instancesName, 
"WriteCounter"), 1);
     Assert.assertEquals((long) _beanServer.getAttribute(instancesName, 
"WriteBytesCounter"), 5);
     Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"WriteTotalLatencyCounter") >= 10);
-    Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"WriteMaxLatencyGauge") >= 10);
     Assert
         .assertTrue((long) _beanServer.getAttribute(instancesName, 
"WriteLatencyGauge.Max") >= 10);
     Assert.assertTrue(
         (long) _beanServer.getAttribute(instancesName, 
"WriteTotalLatencyCounter") >= 10);
-    Assert.assertTrue((long) _beanServer.getAttribute(instancesName, 
"WriteMaxLatencyGauge") >= 10);
   }
 }

Reply via email to