Refactor ZkClient methods to simplify monitoring configures when create a client.
Additional change: 1. Refine monitor class to reduce unnecessary MBean attributes for ZkClient. 2. Apply new monitor framework to MessageLatencyMonitor. 3. Fix a MBeanRegistrar bug, MBean should be registered according to the specified path in order. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2b569db8 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2b569db8 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2b569db8 Branch: refs/heads/master Commit: 2b569db89fc6f1da1199fb5e981ad08b379ff787 Parents: c7b250a Author: Jiajun Wang <jjw...@linkedin.com> Authored: Mon Aug 28 14:46:15 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:07:09 2017 -0800 ---------------------------------------------------------------------- .../apache/helix/manager/zk/ZKHelixManager.java | 10 +- .../helix/manager/zk/ZkAsyncCallbacks.java | 13 +- .../manager/zk/ZkCacheBaseDataAccessor.java | 9 +- .../org/apache/helix/manager/zk/ZkClient.java | 124 ++++++++++++++++--- .../monitoring/ParticipantStatusMonitor.java | 6 +- .../helix/monitoring/mbeans/MBeanRegistrar.java | 10 +- .../mbeans/MessageLatencyMonitor.java | 98 +++++++++------ .../monitoring/mbeans/ZkClientMonitor.java | 21 +++- .../monitoring/mbeans/ZkClientPathMonitor.java | 30 +---- .../dynamicMBeans/HistogramDynamicMetric.java | 4 - .../apache/helix/manager/zk/TestZkClient.java | 21 ++-- .../monitoring/mbeans/TestZkClientMonitor.java | 16 +-- 12 files changed, 231 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 82c3224..d048ad1 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -599,8 +599,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build(); - _zkclient = new ZkClient(_zkAddress, _sessionTimeout, _clientConnectionTimeout, zkSerializer, - _instanceType.name(), String.format("%s.%s", _clusterName, _instanceName)); + ZkClient.Builder zkClientBuilder = new ZkClient.Builder(); + zkClientBuilder.setZkServer(_zkAddress).setSessionTimeout(_sessionTimeout) + .setConnectionTimeout(_clientConnectionTimeout).setZkSerializer(zkSerializer) + .setMonitorType(_instanceType.name()) + .setMonitorKey(String.format("%s.%s", _clusterName, _instanceName)) + .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) && !_instanceType + .equals(InstanceType.CONTROLLER_PARTICIPANT)); + _zkclient = zkClientBuilder.build(); _baseDataAccessor = createBaseDataAccessor(); http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java index 2be1244..1695ced 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java @@ -133,10 +133,12 @@ public class ZkAsyncCallbacks { if (ctx != null && ctx instanceof ZkAsyncCallContext) { ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx; - if (zkCtx._isRead) { - zkCtx._monitor.recordRead(path, zkCtx._bytes, zkCtx._startTimeMilliSec); - } else { - zkCtx._monitor.recordWrite(path, zkCtx._bytes, zkCtx._startTimeMilliSec); + if (zkCtx._monitor != null) { + if (zkCtx._isRead) { + zkCtx._monitor.recordRead(path, zkCtx._bytes, zkCtx._startTimeMilliSec); + } else { + zkCtx._monitor.recordWrite(path, zkCtx._bytes, zkCtx._startTimeMilliSec); + } } } @@ -177,9 +179,6 @@ public class ZkAsyncCallbacks { public ZkAsyncCallContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes, boolean isRead) { - if (monitor == null) { - throw new NullPointerException("ZkClientMonitor must not be null."); - } _monitor = monitor; _startTimeMilliSec = startTimeMilliSec; _bytes = bytes; http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java index 322c415..cb43ea9 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java @@ -109,9 +109,12 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath, List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) { - _zkclient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, new BasicZkSerializer(serializer), monitorType, - monitorkey); + ZkClient.Builder zkClientBuilder = new ZkClient.Builder(); + zkClientBuilder.setZkServer(zkAddress).setSessionTimeout(ZkClient.DEFAULT_SESSION_TIMEOUT) + .setConnectionTimeout(ZkClient.DEFAULT_CONNECTION_TIMEOUT).setZkSerializer(serializer) + .setMonitorType(monitorType).setMonitorKey(monitorkey); + _zkclient = zkClientBuilder.build(); + _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); _baseAccessor = new ZkBaseDataAccessor<>(_zkclient); http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java index a65a75a..7ec693f 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java @@ -42,7 +42,6 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import javax.management.JMException; @@ -63,17 +62,23 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { private PathBasedZkSerializer _zkSerializer; private ZkClientMonitor _monitor; + private ZkClient(IZkConnection connection, int connectionTimeout, long operationRetryTimeout, + PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, + boolean monitorRootPathOnly) { + super(connection, connectionTimeout, new ByteArraySerializer(), operationRetryTimeout); + init(zkSerializer, monitorType, monitorKey, monitorRootPathOnly); + } + public ZkClient(IZkConnection connection, int connectionTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, long operationRetryTimeout) { - super(connection, connectionTimeout, new ByteArraySerializer(), operationRetryTimeout); - init(zkSerializer, monitorType, monitorKey); + this(connection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, + monitorKey, true); } public ZkClient(IZkConnection connection, int connectionTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey) { - super(connection, connectionTimeout, new ByteArraySerializer()); - init(zkSerializer, monitorType, monitorKey); + this(connection, connectionTimeout, zkSerializer, monitorType, monitorKey, -1); } public ZkClient(String zkServers, String monitorType, String monitorKey) { @@ -127,28 +132,20 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { this(zkServers, null, null); } - protected void init(PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey) { + protected void init(PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, + boolean monitorRootPathOnly) { _zkSerializer = zkSerializer; if (LOG.isTraceEnabled()) { StackTraceElement[] calls = Thread.currentThread().getStackTrace(); LOG.trace("created a zkclient. callstack: " + Arrays.asList(calls)); } try { - if (monitorKey == null) { - // Use ZK session Id as monitor key - monitorType = ZkClientMonitor.SESSION_ID_PROPERTY_NAME; - // _connection cannot be null, checked in org.I0Itec.zkclient.ZkClient constructor - ZooKeeper zk = ((ZkConnection) _connection).getZookeeper(); - if (zk != null) { - monitorKey = new Long(zk.getSessionId()).toString(); - } else { // if zkClient is not connected - LOG.error("Cannot creating ZkClientMonitor because ZkClient is not connected."); - return; - } - } else if (monitorType == null) { - monitorType = ZkClientMonitor.CUSTOMIZED_PROPERTY_NAME; + if (monitorKey != null && !monitorKey.isEmpty() && + monitorType != null && !monitorType.isEmpty()) { + _monitor = new ZkClientMonitor(monitorType, monitorKey, monitorRootPathOnly); + } else { + LOG.info("ZkClient monitor key or type is not provided. Skip monitoring."); } - _monitor = new ZkClientMonitor(monitorType, monitorKey); } catch (JMException e) { LOG.error("Error in creating ZkClientMonitor", e); } @@ -595,4 +592,91 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient { _monitor.recordWriteFailure(path); } } + + public static class Builder { + IZkConnection _connection; + String _zkServer; + Integer _sessionTimeout; + + PathBasedZkSerializer _zkSerializer; + + long _operationRetryTimeout = -1L; + int _connectionTimeout = Integer.MAX_VALUE; + + String _monitorType; + String _monitorKey; + boolean _monitorRootPathOnly = true; + + public Builder setConnection(IZkConnection connection) { + this._connection = connection; + return this; + } + + public Builder setConnectionTimeout(Integer connectionTimeout) { + this._connectionTimeout = connectionTimeout; + return this; + } + + public Builder setZkSerializer(PathBasedZkSerializer zkSerializer) { + this._zkSerializer = zkSerializer; + return this; + } + + public Builder setZkSerializer(ZkSerializer zkSerializer) { + this._zkSerializer = new BasicZkSerializer(zkSerializer); + return this; + } + + public Builder setMonitorType(String monitorType) { + this._monitorType = monitorType; + return this; + } + + public Builder setMonitorKey(String monitorKey) { + this._monitorKey = monitorKey; + return this; + } + + public Builder setMonitorRootPathOnly(Boolean monitorRootPathOnly) { + this._monitorRootPathOnly = monitorRootPathOnly; + return this; + } + + public Builder setZkServer(String zkServer) { + this._zkServer = zkServer; + return this; + } + + public Builder setSessionTimeout(Integer sessionTimeout) { + this._sessionTimeout = sessionTimeout; + return this; + } + + public Builder setOperationRetryTimeout(Long operationRetryTimeout) { + this._operationRetryTimeout = operationRetryTimeout; + return this; + } + + public ZkClient build() { + if (_connection == null) { + if (_zkServer == null) { + throw new HelixException( + "Failed to build ZkClient since no connection or ZK server address is specified."); + } else { + if (_sessionTimeout == null) { + _connection = new ZkConnection(_zkServer); + } else { + _connection = new ZkConnection(_zkServer, _sessionTimeout); + } + } + } + + if (_zkSerializer == null) { + _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); + } + + return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer, + _monitorType, _monitorKey, _monitorRootPathOnly); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java index ee1d630..6192c6d 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java @@ -57,7 +57,7 @@ public class ParticipantStatusMonitor { _messageLatencyMonitor = new MessageLatencyMonitor(instanceName); _executorMonitors = new ConcurrentHashMap<>(); register(_messageMonitor, getObjectName(_messageMonitor.getParticipantBeanName())); - register(_messageLatencyMonitor, getObjectName(_messageLatencyMonitor.getBeanName())); + _messageLatencyMonitor.register(MonitorDomainNames.CLMParticipantReport.name()); } } catch (Exception e) { LOG.warn(e); @@ -152,6 +152,9 @@ public class ParticipantStatusMonitor { LOG.warn("fail to unregister " + _messageMonitor.getParticipantBeanName(), e); } } + if (_messageLatencyMonitor != null) { + _messageLatencyMonitor.unregister(); + } for (StateTransitionContext cxt : _monitorMap.keySet()) { try { ObjectName name = getObjectName(cxt.toString()); @@ -163,7 +166,6 @@ public class ParticipantStatusMonitor { } } _monitorMap.clear(); - } public void createExecutorMonitor(String type, ExecutorService executor) { http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java index 05af7a7..6d43575 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MBeanRegistrar.java @@ -20,7 +20,6 @@ package org.apache.helix.monitoring.mbeans; */ import java.lang.management.ManagementFactory; -import java.util.Hashtable; import javax.management.InstanceAlreadyExistsException; import javax.management.JMException; import javax.management.MBeanServer; @@ -88,13 +87,14 @@ public class MBeanRegistrar { + "number of String and at least 2 String"); } - Hashtable<String, String> table = new Hashtable<>(); + StringBuilder objectNameStr = new StringBuilder(); for (int i = 0; i < keyValuePairs.length; i += 2) { - table.put(keyValuePairs[i], keyValuePairs[i + 1]); + objectNameStr.append( + String.format(i == 0 ? "%s=%s" : ",%s=%s", keyValuePairs[i], keyValuePairs[i + 1])); } if (num > 0) { - table.put(DUPLICATE, String.valueOf(num)); + objectNameStr.append(String.format(",%s=%s", DUPLICATE, String.valueOf(num))); } - return new ObjectName(domain, table); + return new ObjectName(String.format("%s:%s", domain, objectNameStr.toString())); } } http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java index 4096be1..35fe8b8 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java @@ -19,66 +19,82 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import org.apache.helix.HelixException; import org.apache.helix.model.Message; import org.apache.helix.monitoring.ParticipantStatusMonitor; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; import org.apache.log4j.Logger; -public class MessageLatencyMonitor implements MessageLatencyMonitorMBean { +import javax.management.JMException; +import javax.management.ObjectName; +import java.util.ArrayList; +import java.util.List; + +public class MessageLatencyMonitor { private static final Logger logger = Logger.getLogger(MessageLatencyMonitor.class.getName()); + private static final String MBEAN_DESCRIPTION = "Helix Message Latency Monitor"; public static String MONITOR_TYPE_KW = "MonitorType"; - private static long DEFAULT_RESET_TIME = 60 * 60 * 1000; - private long _totalMessageLatency; - private long _totalMessageCount; - private long _maxSingleMessageLatency; - private long _lastResetTime; + + private static final MetricRegistry _metricRegistry = new MetricRegistry(); private String _participantName; + private ObjectName _objectName; + + private DynamicMBeanProvider _dynamicMBeanProvider; + + private SimpleDynamicMetric<Long> _totalMessageCount = + new SimpleDynamicMetric("TotalMessageCount", 0l); + private SimpleDynamicMetric<Long> _totalMessageLatency = + new SimpleDynamicMetric("TotalMessageLatency", 0l); + private HistogramDynamicMetric _messageLatencyGauge = + new HistogramDynamicMetric("MessageLatencyGauge", + _metricRegistry.histogram(toString() + "MessageLatencyGauge")); public MessageLatencyMonitor(String participantName) { - _totalMessageLatency = 0L; - _totalMessageCount = 0L; - _maxSingleMessageLatency = 0; - _lastResetTime = System.currentTimeMillis(); - _participantName = participantName; - } + List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); + attributeList.add(_totalMessageCount); + attributeList.add(_totalMessageLatency); + attributeList.add(_messageLatencyGauge); - public String getBeanName() { - return String.format("%s=%s,%s=%s", ParticipantStatusMonitor.PARTICIPANT_KEY, _participantName, - MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()); + _participantName = participantName; + _dynamicMBeanProvider = new DynamicMBeanProvider(String + .format("%s.%s.%s.%s", ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, participantName, + MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()), MBEAN_DESCRIPTION, + attributeList); } public void updateLatency(Message message) { long latency = System.currentTimeMillis() - message.getCreateTimeStamp(); logger.info(String.format("The latency of message %s is %d ms", message.getMsgId(), latency)); - _totalMessageCount++; - _totalMessageLatency += latency; - if (_lastResetTime + DEFAULT_RESET_TIME <= System.currentTimeMillis() || - latency > _maxSingleMessageLatency) { - _maxSingleMessageLatency = latency; - _lastResetTime = System.currentTimeMillis(); - } + _totalMessageCount.updateValue(_totalMessageCount.getValue() + 1); + _totalMessageLatency.updateValue(_totalMessageLatency.getValue() + latency); + _messageLatencyGauge.updateValue(latency); } - - @Override - public long getTotalMessageLatencyCounter() { - return _totalMessageLatency; - } - - @Override - public long getTotalMessageCounter() { - return _totalMessageCount; - } - - @Override - public long getMaxSingleMessageLatencyGauge() { - return _maxSingleMessageLatency; + public void register(String domainName) throws JMException { + if (_objectName != null) { + throw new HelixException("Monitor has already been registed: " + _objectName.toString()); + } + _objectName = MBeanRegistrar + .register(_dynamicMBeanProvider, domainName, ParticipantStatusMonitor.PARTICIPANT_KEY, + _participantName, MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()); } - @Override - public String getSensorName() { - return String - .format("%s.%s.%s.%s.", ParticipantStatusMonitor.PARTICIPANT_STATUS_KEY, _participantName, - MONITOR_TYPE_KW, MessageLatencyMonitor.class.getSimpleName()); + public void unregister() { + if (_objectName != null) { + MBeanRegistrar.unregister(_objectName); + } + _metricRegistry.removeMatching(new MetricFilter() { + @Override + public boolean matches(String name, Metric metric) { + return name.startsWith(toString()); + } + }); } } http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java index b685c45..b385068 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java @@ -19,6 +19,8 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import org.apache.helix.HelixException; + import javax.management.JMException; import javax.management.ObjectName; import java.util.Map; @@ -28,9 +30,6 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { public static final String MONITOR_TYPE = "Type"; public static final String MONITOR_KEY = "Key"; - public static final String SESSION_ID_PROPERTY_NAME = "SessionId"; - public static final String CUSTOMIZED_PROPERTY_NAME = "CustomizedKey"; - private ObjectName _objectName; private String _monitorType; private String _monitorKey; @@ -41,14 +40,24 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> _zkClientPathMonitorMap = new ConcurrentHashMap<>(); - public ZkClientMonitor(String monitorType, String monitorKey) throws JMException { + public ZkClientMonitor(String monitorType, String monitorKey, boolean monitorRootPathOnly) + throws JMException { + if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || monitorType + .isEmpty()) { + throw new HelixException("Cannot create ZkClientMonitor without monitor key and type."); + } + _monitorType = monitorType; _monitorKey = monitorKey; regitster(monitorType, monitorKey); for (ZkClientPathMonitor.PredefinedPath path : ZkClientPathMonitor.PredefinedPath.values()) { - _zkClientPathMonitorMap - .put(path, new ZkClientPathMonitor(path.name(), monitorType, monitorKey)); + // If monitor root path only, check if the current path is Root. + // Otherwise, add monitors for every path. + if (!monitorRootPathOnly || path.equals(ZkClientPathMonitor.PredefinedPath.Root)) { + _zkClientPathMonitorMap + .put(path, new ZkClientPathMonitor(path.name(), monitorType, monitorKey)); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java index dac0cbd..d10f66c 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java @@ -26,6 +26,7 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; +import org.apache.helix.HelixException; import org.apache.helix.monitoring.mbeans.dynamicMBeans.*; import java.util.ArrayList; @@ -34,7 +35,6 @@ import java.util.List; public class ZkClientPathMonitor { public static final String MONITOR_PATH = "PATH"; - private static final long RESET_INTERVAL = 1000 * 60 * 10; // 1 hour private static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client Monitor"; private static final MetricRegistry _metricRegistry = new MetricRegistry(); @@ -68,8 +68,6 @@ public class ZkClientPathMonitor { } } - private long _lastResetTime = 0; - private SimpleDynamicMetric<Long> _readCounter = new SimpleDynamicMetric("ReadCounter", 0l); private SimpleDynamicMetric<Long> _writeCounter = new SimpleDynamicMetric("WriteCounter", 0l); private SimpleDynamicMetric<Long> _readBytesCounter = @@ -84,10 +82,6 @@ public class ZkClientPathMonitor { new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l); private SimpleDynamicMetric<Long> _writeTotalLatencyCounter = new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l); - private SimpleDynamicMetric<Long> _readMaxLatencyGauge = - new SimpleDynamicMetric("ReadMaxLatencyGauge", 0l); - private SimpleDynamicMetric<Long> _writeMaxLatencyGauge = - new SimpleDynamicMetric("WriteMaxLatencyGauge", 0l); private HistogramDynamicMetric _readLatencyGauge = new HistogramDynamicMetric("ReadLatencyGauge", _metricRegistry.histogram(toString() + "ReadLatencyGauge")); @@ -101,6 +95,11 @@ public class ZkClientPathMonitor { protected ZkClientPathMonitor(String path, String monitorType, String monitorKey) throws JMException { + if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || monitorType + .isEmpty()) { + throw new HelixException("Cannot create ZkClientMonitor without monitor key and type."); + } + _monitorType = monitorType; _monitorKey = monitorKey; _path = path; @@ -114,8 +113,6 @@ public class ZkClientPathMonitor { attributeList.add(_writeFailureCounter); attributeList.add(_readTotalLatencyCounter); attributeList.add(_writeTotalLatencyCounter); - attributeList.add(_readMaxLatencyGauge); - attributeList.add(_writeMaxLatencyGauge); attributeList.add(_readLatencyGauge); attributeList.add(_writeLatencyGauge); attributeList.add(_readBytesGauge); @@ -154,13 +151,6 @@ public class ZkClientPathMonitor { } else { increaseCounter(isRead); increaseTotalLatency(isRead, latencyMilliSec); - if (_lastResetTime + RESET_INTERVAL <= System.currentTimeMillis() - || latencyMilliSec > (isRead ? - _readMaxLatencyGauge.getValue() : - _writeMaxLatencyGauge.getValue())) { - setMaxLatency(isRead, latencyMilliSec); - _lastResetTime = System.currentTimeMillis(); - } if (bytes > 0) { increaseBytesCounter(isRead, bytes); } @@ -202,12 +192,4 @@ public class ZkClientPathMonitor { _writeLatencyGauge.updateValue(latencyDelta); } } - - private void setMaxLatency(boolean isRead, long latency) { - if (isRead) { - _readMaxLatencyGauge.updateValue(latency); - } else { - _writeMaxLatencyGauge.updateValue(latency); - } - } } http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java index bf447a1..bd6e6c9 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/HistogramDynamicMetric.java @@ -41,15 +41,11 @@ public class HistogramDynamicMetric extends DynamicMetric<Histogram, Long> { * The enum statistic attributes */ enum SnapshotAttribute { - Median("getMedian", "Median"), Pct75th("get75thPercentile", "75Pct"), Pct95th("get95thPercentile", "95Pct"), - Pct98th("get98thPercentile", "98ct"), Pct99th("get99thPercentile", "99Pct"), - Pct999th("get999thPercentile", "999Pct"), Max("getMax", "Max"), Mean("getMean", "Mean"), - Min("getMin", "Min"), StdDev("getStdDev", "StdDev"); final String _getMethodName; http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java index 7b2ef6d..c8f8d69 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java @@ -144,7 +144,10 @@ public class TestZkClient extends ZkUnitTestBase { final String TEST_NODE = "/test_zkclient_monitor"; final String TEST_PATH = TEST_ROOT + TEST_NODE; - ZkClient zkClient = new ZkClient(ZK_ADDR, TEST_TAG, TEST_KEY); + ZkClient.Builder builder = new ZkClient.Builder(); + builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG) + .setMonitorRootPathOnly(false); + ZkClient zkClient = builder.build(); final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length; @@ -174,11 +177,11 @@ public class TestZkClient extends ZkUnitTestBase { // Test exists Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0); Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadMaxLatencyGauge"), 0); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0); zkClient.exists(TEST_ROOT); Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1); Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0); - Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadMaxLatencyGauge") >= 0); + Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0); // Test create Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0); @@ -186,10 +189,10 @@ public class TestZkClient extends ZkUnitTestBase { Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0); Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0); Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteMaxLatencyGauge"), 0); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0); Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteMaxLatencyGauge"), 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0); zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT); Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1); Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), @@ -200,11 +203,11 @@ public class TestZkClient extends ZkUnitTestBase { long origWriteTotalLatencyCounter = (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"); Assert.assertTrue(origWriteTotalLatencyCounter >= 0); - Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteMaxLatencyGauge") >= 0); + Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0); long origIdealStatesWriteTotalLatencyCounter = (long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"); Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0); - Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteMaxLatencyGauge") >= 0); + Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0); // Test read Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1); @@ -216,7 +219,7 @@ public class TestZkClient extends ZkUnitTestBase { long origIdealStatesReadTotalLatencyCounter = (long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter"); Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadMaxLatencyGauge"), 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0); zkClient.readData(TEST_PATH, new Stat()); Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2); Assert @@ -228,7 +231,7 @@ public class TestZkClient extends ZkUnitTestBase { >= origReadTotalLatencyCounter); Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter") >= origIdealStatesReadTotalLatencyCounter); - Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadMaxLatencyGauge") >= 0); + Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0); zkClient.getChildren(TEST_PATH); Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3); Assert http://git-wip-us.apache.org/repos/asf/helix/blob/2b569db8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java index 9fff64d..e227b86 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java @@ -57,9 +57,14 @@ public class TestZkClientMonitor { final String TEST_TAG_1 = "test_tag_1"; final String TEST_KEY_1 = "test_key_1"; - ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1); + ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, true); Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1))); - ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1); + + // no per-path monitor items created since "monitorRootPathOnly" = true + Assert.assertFalse(_beanServer.isRegistered(buildPathMonitorObjectName(TEST_TAG_1, TEST_KEY_1, + ZkClientPathMonitor.PredefinedPath.IdealStates.name()))); + + ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, true); Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, 1))); monitor.unregister(); @@ -74,7 +79,7 @@ public class TestZkClientMonitor { final String TEST_TAG = "test_tag_3"; final String TEST_KEY = "test_key_3"; - ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY); + ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, false); ObjectName name = buildObjectName(TEST_TAG, TEST_KEY); ObjectName rootName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, ZkClientPathMonitor.PredefinedPath.Root.name()); @@ -93,14 +98,11 @@ public class TestZkClientMonitor { Assert.assertEquals((long) _beanServer.getAttribute(rootName, "ReadCounter"), 1); Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, "ReadCounter"), 1); Assert.assertTrue((long) _beanServer.getAttribute(rootName, "ReadLatencyGauge.Max") >= 10); - Assert.assertTrue((long) _beanServer.getAttribute(rootName, "ReadMaxLatencyGauge") >= 10); - monitor.recordRead("TEST/INSTANCES/testDB0", 0, System.currentTimeMillis() - 15); Assert.assertEquals((long) _beanServer.getAttribute(rootName, "ReadCounter"), 2); Assert.assertEquals((long) _beanServer.getAttribute(instancesName, "ReadCounter"), 1); Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, "ReadCounter"), 1); Assert.assertTrue((long) _beanServer.getAttribute(rootName, "ReadTotalLatencyCounter") >= 25); - Assert.assertTrue((long) _beanServer.getAttribute(rootName, "ReadMaxLatencyGauge") >= 15); monitor.recordWrite("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5, System.currentTimeMillis() - 10); @@ -110,11 +112,9 @@ public class TestZkClientMonitor { Assert.assertEquals((long) _beanServer.getAttribute(instancesName, "WriteCounter"), 1); Assert.assertEquals((long) _beanServer.getAttribute(instancesName, "WriteBytesCounter"), 5); Assert.assertTrue((long) _beanServer.getAttribute(rootName, "WriteTotalLatencyCounter") >= 10); - Assert.assertTrue((long) _beanServer.getAttribute(rootName, "WriteMaxLatencyGauge") >= 10); Assert .assertTrue((long) _beanServer.getAttribute(instancesName, "WriteLatencyGauge.Max") >= 10); Assert.assertTrue( (long) _beanServer.getAttribute(instancesName, "WriteTotalLatencyCounter") >= 10); - Assert.assertTrue((long) _beanServer.getAttribute(instancesName, "WriteMaxLatencyGauge") >= 10); } }