use SlidingTimeWindownReservoir for histogram stats
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3a73b0f3 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3a73b0f3 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3a73b0f3 Branch: refs/heads/master Commit: 3a73b0f30336c4610a501af979d55c4d25344214 Parents: 2f791a6 Author: hrzhang <[email protected]> Authored: Tue Nov 7 11:38:36 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:31:00 2018 -0800 ---------------------------------------------------------------------- .../monitoring/mbeans/ClusterEventMonitor.java | 35 ++++++-- .../monitoring/mbeans/ClusterStatusMonitor.java | 2 +- .../monitoring/mbeans/HelixCallbackMonitor.java | 20 +++-- .../mbeans/MessageLatencyMonitor.java | 18 ++-- .../monitoring/mbeans/ResourceMonitor.java | 86 +++++++++++--------- .../monitoring/mbeans/ZkClientPathMonitor.java | 58 +++++++------ .../dynamicMBeans/DynamicMBeanProvider.java | 17 +--- .../TestClusterEventStatusMonitor.java | 68 +++++++++++++++- 8 files changed, 201 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java index e7f09ea..8c77466 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java @@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import java.util.concurrent.TimeUnit; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; @@ -35,20 +38,16 @@ public class ClusterEventMonitor extends DynamicMBeanProvider { TotalProcessed } - private static final long RESET_INTERVAL = 1000 * 60 * 10; // 1 hour private static final String CLUSTEREVENT_DN_KEY = "ClusterEventStatus"; private static final String EVENT_DN_KEY = "eventName"; private static final String PHASE_DN_KEY = "phaseName"; private final String _phaseName; - private SimpleDynamicMetric<Long> _totalDuration = - new SimpleDynamicMetric("TotalDurationCounter", 0l); - private SimpleDynamicMetric<Long> _maxDuration = - new SimpleDynamicMetric("MaxSingleDurationGauge", 0l); - private SimpleDynamicMetric<Long> _count = new SimpleDynamicMetric("EventCounter", 0l); - private HistogramDynamicMetric _duration = new HistogramDynamicMetric("DurationGauge", - _metricRegistry.histogram(getMetricName("DurationGauge"))); + private SimpleDynamicMetric<Long> _totalDuration; + private SimpleDynamicMetric<Long> _maxDuration; + private SimpleDynamicMetric<Long> _count; + private HistogramDynamicMetric _duration; private long _lastResetTime; private ClusterStatusMonitor _clusterStatusMonitor; @@ -56,13 +55,31 @@ public class ClusterEventMonitor extends DynamicMBeanProvider { public ClusterEventMonitor(ClusterStatusMonitor clusterStatusMonitor, String phaseName) { _phaseName = phaseName; _clusterStatusMonitor = clusterStatusMonitor; + + _duration = new HistogramDynamicMetric("DurationGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _count = new SimpleDynamicMetric("EventCounter", 0l); + _maxDuration = new SimpleDynamicMetric("MaxSingleDurationGauge", 0l); + _totalDuration = new SimpleDynamicMetric("TotalDurationCounter", 0l); + } + + public ClusterEventMonitor(ClusterStatusMonitor clusterStatusMonitor, String phaseName, + int histogramTimeWindowMs) { + _phaseName = phaseName; + _clusterStatusMonitor = clusterStatusMonitor; + + _duration = new HistogramDynamicMetric("DurationGauge", new Histogram( + new SlidingTimeWindowReservoir(histogramTimeWindowMs, TimeUnit.MILLISECONDS))); + _count = new SimpleDynamicMetric("EventCounter", 0l); + _maxDuration = new SimpleDynamicMetric("MaxSingleDurationGauge", 0l); + _totalDuration = new SimpleDynamicMetric("TotalDurationCounter", 0l); } public void reportDuration(long duration) { _totalDuration.updateValue(_totalDuration.getValue() + duration); _count.updateValue(_count.getValue() + 1); _duration.updateValue(duration); - if (_lastResetTime + RESET_INTERVAL <= System.currentTimeMillis() || + if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis() || duration > _maxDuration.getValue()) { _maxDuration.updateValue(duration); _lastResetTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index c644d10..61f4ce1 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -69,7 +69,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>(); // phaseName -> eventMonitor - private final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap = + protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap = new ConcurrentHashMap<>(); /** http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java index 0dbafb4..0fc2001 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java @@ -19,6 +19,8 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; import org.apache.helix.HelixConstants; import org.apache.helix.InstanceType; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; @@ -28,6 +30,7 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; import javax.management.JMException; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; import java.util.List; public class HelixCallbackMonitor extends DynamicMBeanProvider { @@ -42,14 +45,11 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider { private final String _clusterName; private final String _instanceName; - private SimpleDynamicMetric<Long> _counter = new SimpleDynamicMetric("Counter", 0l); - private SimpleDynamicMetric<Long> _unbatchedCounter = - new SimpleDynamicMetric("UnbatchedCounter", 0l); - private SimpleDynamicMetric<Long> _totalLatencyCounter = - new SimpleDynamicMetric("LatencyCounter", 0l); + private SimpleDynamicMetric<Long> _counter; + private SimpleDynamicMetric<Long> _unbatchedCounter; + private SimpleDynamicMetric<Long> _totalLatencyCounter; - private HistogramDynamicMetric _latencyGauge = new HistogramDynamicMetric("LatencyGauge", - _metricRegistry.histogram(getMetricName("LatencyGauge"))); + private HistogramDynamicMetric _latencyGauge; public HelixCallbackMonitor(InstanceType type, String clusterName, String instanceName, HelixConstants.ChangeType changeType) throws JMException { @@ -62,6 +62,12 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider { _sensorName = String .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), type.name(), clusterName, changeType.name()); + + _latencyGauge = new HistogramDynamicMetric("LatencyGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _totalLatencyCounter = new SimpleDynamicMetric("LatencyCounter", 0l); + _unbatchedCounter = new SimpleDynamicMetric("UnbatchedCounter", 0l); + _counter = new SimpleDynamicMetric("Counter", 0l); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java index f1c1039..dac5826 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java @@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import java.util.concurrent.TimeUnit; import org.apache.helix.model.Message; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; @@ -35,19 +38,20 @@ public class MessageLatencyMonitor extends DynamicMBeanProvider { private final String _domainName; private final String _participantName; - private SimpleDynamicMetric<Long> _totalMessageCount = - new SimpleDynamicMetric("TotalMessageCount", 0l); - private SimpleDynamicMetric<Long> _totalMessageLatency = - new SimpleDynamicMetric("TotalMessageLatency", 0l); - private HistogramDynamicMetric _messageLatencyGauge = - new HistogramDynamicMetric("MessageLatencyGauge", - _metricRegistry.histogram(getMetricName("MessageLatencyGauge"))); + private SimpleDynamicMetric<Long> _totalMessageCount; + private SimpleDynamicMetric<Long> _totalMessageLatency; + private HistogramDynamicMetric _messageLatencyGauge; public MessageLatencyMonitor(String domainName, String participantName) throws JMException { _domainName = domainName; _participantName = participantName; _sensorName = String.format("%s.%s", ParticipantMessageMonitor.PARTICIPANT_STATUS_KEY, "MessageLatency"); + + _messageLatencyGauge = new HistogramDynamicMetric("MessagelatencyGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _totalMessageLatency = new SimpleDynamicMetric("TotalMessageLatency", 0l); + _totalMessageCount = new SimpleDynamicMetric("TotalMessageCount", 0l); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index 3318ddd..662f323 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import java.util.concurrent.TimeUnit; import org.apache.helix.HelixDefinedState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -33,46 +36,27 @@ import javax.management.ObjectName; import java.util.*; public class ResourceMonitor extends DynamicMBeanProvider { - private static final long RESET_TIME_RANGE = 1000 * 60 * 60; // 1 hour // Gauges - private SimpleDynamicMetric<Integer> _numOfPartitions = - new SimpleDynamicMetric("PartitionGauge", 0); - private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView = - new SimpleDynamicMetric("ExternalViewPartitionGauge", 0); - private SimpleDynamicMetric<Integer> _numOfErrorPartitions = - new SimpleDynamicMetric("ErrorPartitionGauge", 0); - private SimpleDynamicMetric<Integer> _numNonTopStatePartitions = - new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0); - private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions = - new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l); - private SimpleDynamicMetric<Long> _numLessReplicaPartitions = - new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l); - private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions = - new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l); - private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions = - new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l); - private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions = - new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l); - private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions = - new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l); - private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff = - new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0); + private SimpleDynamicMetric<Integer> _numOfPartitions; + private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView; + private SimpleDynamicMetric<Integer> _numOfErrorPartitions; + private SimpleDynamicMetric<Integer> _numNonTopStatePartitions; + private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions; + private SimpleDynamicMetric<Long> _numLessReplicaPartitions; + private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions; + private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions; + private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions; + private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions; + private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff; // Counters - private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter = - new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l); - private SimpleDynamicMetric<Long> _successTopStateHandoffCounter = - new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l); - private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter = - new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l); - private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration = - new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l); - private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge = - new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", _metricRegistry - .histogram(getMetricName("PartitionTopStateHandoffDurationGauge"))); - private SimpleDynamicMetric<Long> _totalMessageReceived = - new SimpleDynamicMetric("TotalMessageReceived", 0l); + private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter; + private SimpleDynamicMetric<Long> _successTopStateHandoffCounter; + private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter; + private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration; + private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge; + private SimpleDynamicMetric<Long> _totalMessageReceived; private String _tag = ClusterStatusMonitor.DEFAULT_TAG; private long _lastResetTime; @@ -114,6 +98,34 @@ public class ResourceMonitor extends DynamicMBeanProvider { _clusterName = clusterName; _resourceName = resourceName; _initObjectName = objectName; + + _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0l); + _numLoadRebalanceThrottledPartitions = + new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l); + _numRecoveryRebalanceThrottledPartitions = + new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l); + _numPendingLoadRebalancePartitions = + new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l); + _numPendingRecoveryRebalancePartitions = + new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l); + _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l); + _numLessMinActiveReplicaPartitions = + new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l); + _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0l); + _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0l); + _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0l); + _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0l); + + _partitionTopStateHandoffDurationGauge = + new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0l); + _maxSinglePartitionTopStateHandoffDuration = + new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l); + _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l); + _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l); + _successfulTopStateHandoffDurationCounter = + new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l); } @Override @@ -341,7 +353,7 @@ public class ResourceMonitor extends DynamicMBeanProvider { } public void resetMaxTopStateHandoffGauge() { - if (_lastResetTime + RESET_TIME_RANGE <= System.currentTimeMillis()) { + if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) { _maxSinglePartitionTopStateHandoffDuration.updateValue(0l); _lastResetTime = System.currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java index e87738b..bc6a36b 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java @@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import java.util.concurrent.TimeUnit; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; @@ -62,30 +65,19 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider { } } - private SimpleDynamicMetric<Long> _readCounter = new SimpleDynamicMetric("ReadCounter", 0l); - private SimpleDynamicMetric<Long> _writeCounter = new SimpleDynamicMetric("WriteCounter", 0l); - private SimpleDynamicMetric<Long> _readBytesCounter = - new SimpleDynamicMetric("ReadBytesCounter", 0l); - private SimpleDynamicMetric<Long> _writeBytesCounter = - new SimpleDynamicMetric("WriteBytesCounter", 0l); - private SimpleDynamicMetric<Long> _readFailureCounter = - new SimpleDynamicMetric("ReadFailureCounter", 0l); - private SimpleDynamicMetric<Long> _writeFailureCounter = - new SimpleDynamicMetric("WriteFailureCounter", 0l); - private SimpleDynamicMetric<Long> _readTotalLatencyCounter = - new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l); - private SimpleDynamicMetric<Long> _writeTotalLatencyCounter = - new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l); - - private HistogramDynamicMetric _readLatencyGauge = new HistogramDynamicMetric("ReadLatencyGauge", - _metricRegistry.histogram(getMetricName("ReadLatencyGauge"))); - private HistogramDynamicMetric _writeLatencyGauge = - new HistogramDynamicMetric("WriteLatencyGauge", - _metricRegistry.histogram(getMetricName("WriteLatencyGauge"))); - private HistogramDynamicMetric _readBytesGauge = new HistogramDynamicMetric("ReadBytesGauge", - _metricRegistry.histogram(getMetricName("ReadBytesGauge"))); - private HistogramDynamicMetric _writeBytesGauge = new HistogramDynamicMetric("WriteBytesGauge", - _metricRegistry.histogram(getMetricName("WriteBytesGauge"))); + private SimpleDynamicMetric<Long> _readCounter; + private SimpleDynamicMetric<Long> _writeCounter; + private SimpleDynamicMetric<Long> _readBytesCounter; + private SimpleDynamicMetric<Long> _writeBytesCounter; + private SimpleDynamicMetric<Long> _readFailureCounter; + private SimpleDynamicMetric<Long> _writeFailureCounter; + private SimpleDynamicMetric<Long> _readTotalLatencyCounter; + private SimpleDynamicMetric<Long> _writeTotalLatencyCounter; + + private HistogramDynamicMetric _readLatencyGauge; + private HistogramDynamicMetric _writeLatencyGauge; + private HistogramDynamicMetric _readBytesGauge; + private HistogramDynamicMetric _writeBytesGauge; @Override public String getSensorName() { @@ -101,6 +93,24 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider { _sensorName = String .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey, path.name()); + + _writeTotalLatencyCounter = new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l); + _readTotalLatencyCounter = new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l); + _writeFailureCounter = new SimpleDynamicMetric("WriteFailureCounter", 0l); + _readFailureCounter = new SimpleDynamicMetric("ReadFailureCounter", 0l); + _writeBytesCounter = new SimpleDynamicMetric("WriteBytesCounter", 0l); + _readBytesCounter = new SimpleDynamicMetric("ReadBytesCounter", 0l); + _writeCounter = new SimpleDynamicMetric("WriteCounter", 0l); + _readCounter = new SimpleDynamicMetric("ReadCounter", 0l); + + _readLatencyGauge = new HistogramDynamicMetric("ReadLatencyGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _writeLatencyGauge = new HistogramDynamicMetric("WriteLatencyGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _readBytesGauge = new HistogramDynamicMetric("ReadBytesGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _writeBytesGauge = new HistogramDynamicMetric("WriteBytesGauge", new Histogram( + new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); } public ZkClientPathMonitor register() throws JMException { http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java index cc97b3b..b44c63c 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java @@ -19,9 +19,6 @@ package org.apache.helix.monitoring.mbeans.dynamicMBeans; * under the License. */ -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricFilter; -import com.codahale.metrics.MetricRegistry; import org.apache.helix.HelixException; import org.apache.helix.monitoring.SensorNameProvider; import org.apache.helix.monitoring.mbeans.MBeanRegistrar; @@ -36,7 +33,7 @@ import java.util.*; */ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNameProvider { protected final Logger _logger = LoggerFactory.getLogger(getClass()); - protected static final MetricRegistry _metricRegistry = new MetricRegistry(); + protected static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // Reset time every hour private static String SENSOR_NAME_TAG = "SensorName"; private static String DEFAULT_DESCRIPTION = "Information on the management interface of the MBean"; @@ -86,11 +83,6 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr doRegister(dynamicMetrics, null, objectName); } - protected String getMetricName(String metricName) { - return MetricRegistry - .name(getClass().getSimpleName(), Integer.toHexString(hashCode()), metricName); - } - /** * Update the Dynamic MBean provider with new metric list. * @@ -145,13 +137,6 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr * After unregistered, the MBean can't be registered again, a new monitor has be to created. */ public synchronized void unregister() { - final String metricNamePrefix = getMetricName(null); - _metricRegistry.removeMatching(new MetricFilter() { - @Override - public boolean matches(String name, Metric metric) { - return name.startsWith(metricNamePrefix); - } - }); MBeanRegistrar.unregister(_objectName); } http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java index eb4f94b..b607add 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java @@ -22,8 +22,10 @@ package org.apache.helix.monitoring; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.management.AttributeNotFoundException; import javax.management.InstanceNotFoundException; +import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; @@ -40,21 +42,43 @@ import org.testng.Assert; import org.testng.annotations.Test; public class TestClusterEventStatusMonitor { + private static final int TEST_SLIDING_WINDOW_MS = 2000; // 2s window for testing + + private class ClusterStatusMonitorForTest extends ClusterStatusMonitor { + public ClusterStatusMonitorForTest(String clusterName) { + super(clusterName); + } + public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMBean() { + return _clusterEventMbeanMap; + } + } @Test() public void test() throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, IOException, InterruptedException, MBeanException, AttributeNotFoundException, - ReflectionException { + ReflectionException{ System.out.println("START TestClusterEventStatusMonitor"); String clusterName = "TestCluster"; - ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName); + ClusterStatusMonitorForTest monitor = new ClusterStatusMonitorForTest(clusterName); MBeanServer _server = ManagementFactory.getPlatformMBeanServer(); Set<ObjectInstance> mbeans = _server.queryMBeans(new ObjectName("ClusterStatus:Cluster=TestCluster,eventName=ClusterEvent,*"), null); Assert.assertEquals(mbeans.size(), 0); + // Customize event monitors for testing + try { + this.addTestEventMonitor(monitor, ClusterEventMonitor.PhaseName.Callback.name()); + this.addTestEventMonitor(monitor, ClusterEventMonitor.PhaseName.InQueue.name()); + this.addTestEventMonitor(monitor, BestPossibleStateCalcStage.class.getSimpleName()); + this.addTestEventMonitor(monitor, ReadClusterDataStage.class.getSimpleName()); + this.addTestEventMonitor(monitor, IntermediateStateCalcStage.class.getSimpleName()); + this.addTestEventMonitor(monitor, TaskAssignmentStage.class.getSimpleName()); + } catch (JMException jme) { + Assert.assertTrue(false, "Failed to customize event monitors"); + } + int count = 5; Long totalDuration = 0L; for (int i = 1; i <= count; i++) { @@ -77,9 +101,39 @@ public class TestClusterEventStatusMonitor { Long maxDuration = (Long) _server.getAttribute(mbean.getObjectName(), "MaxSingleDurationGauge"); Long eventCount = (Long) _server.getAttribute(mbean.getObjectName(), "EventCounter"); + Double pct75th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct75th"); + Double pct95th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct95th"); + Double pct99th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct99th"); + Long max = (Long) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Max"); + Double stddev = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.StdDev"); + Assert.assertEquals(duration, totalDuration); Assert.assertEquals(maxDuration, Long.valueOf(100 * count)); Assert.assertEquals(eventCount, Long.valueOf(count)); + Assert.assertTrue(Math.abs(pct75th - 450.0) < 1); + Assert.assertTrue(Math.abs(pct95th - 500.0) < 1); + Assert.assertTrue(Math.abs(pct99th - 500.0) < 1); + Assert.assertTrue(max == 500); + Assert.assertTrue(Math.abs(stddev - 158.0) < 0.2); + } + + System.out.println("\nWaiting for time window to expire\n"); + Thread.sleep(TEST_SLIDING_WINDOW_MS); + + // Since sliding window has expired, just make sure histograms have its values reset + for (ObjectInstance mbean : mbeans) { + Double pct75th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct75th"); + Double pct95th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct95th"); + Double pct99th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct99th"); + Long max = (Long) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Max"); + Double stddev = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.StdDev"); + + Assert.assertTrue(pct75th == 0.0); + Assert.assertTrue(pct95th == 0.0); + Assert.assertTrue(pct99th == 0.0); + Assert.assertTrue(max == 0); + Assert.assertTrue(stddev == 0.0); + } monitor.reset(); @@ -91,4 +145,14 @@ public class TestClusterEventStatusMonitor { System.out.println("END TestParticipantMonitor"); } + + private void addTestEventMonitor(ClusterStatusMonitorForTest monitor, String phaseName) throws + JMException { + ConcurrentHashMap<String, ClusterEventMonitor> mbean = monitor.getClusterEventMBean(); + ClusterEventMonitor eventMonitor = new ClusterEventMonitor(monitor, phaseName, + TEST_SLIDING_WINDOW_MS); + eventMonitor.register(); + mbean.put(phaseName, eventMonitor); + } + }
