This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 88e2188f7e8af41a0b4d2329006beb0e9230a817 Author: Junkai Xue <[email protected]> AuthorDate: Wed May 15 08:28:16 2019 -0700 Refactor StateTransitionStatMonitor extends DynamicMbean To support per state transition latency, the first step is to change the StateTransitionStatMonitor to DynamicMbean. RB=1671496 RB=1671496 RB=1671496 BUG=HELIX-1890 G=helix-reviewers A=hulee Signed-off-by: Hunter Lee <[email protected]> --- .../apache/helix/monitoring/mbeans/JobMonitor.java | 17 --- .../mbeans/ParticipantStatusMonitor.java | 5 +- .../mbeans/StateTransitionStatMonitor.java | 169 +++++++-------------- .../mbeans/dynamicMBeans/DynamicMBeanProvider.java | 16 ++ .../helix/monitoring/TestParticipantMonitor.java | 8 +- 5 files changed, 78 insertions(+), 137 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java index 6589e96..0c4aad4 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java @@ -204,21 +204,4 @@ public class JobMonitor extends DynamicMBeanProvider { doRegister(attributeList, _initObjectName); return this; } - - /** - * NOTE: This method is not thread-safe nor atomic. - * Increment the value of a given SimpleDynamicMetric by 1. - */ - private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) { - metric.updateValue(metric.getValue() + 1); - } - - /** - * NOTE: This method is not thread-safe nor atomic. - * Increment the value of a given SimpleDynamicMetric by 1. - */ - private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long value) { - metric.updateValue(metric.getValue() + value); - } - } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java index 9eb29ff..f2fe72d 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java @@ -101,10 +101,9 @@ public class ParticipantStatusMonitor { synchronized (this) { if (!_monitorMap.containsKey(cxt)) { StateTransitionStatMonitor bean = - new StateTransitionStatMonitor(cxt); + new StateTransitionStatMonitor(cxt, getObjectName(cxt.toString())); _monitorMap.put(cxt, bean); - String beanName = cxt.toString(); - register(bean, getObjectName(beanName)); + bean.register(); } } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java index b1e93e6..93ace95 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java @@ -19,37 +19,70 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ -import java.util.concurrent.ConcurrentHashMap; - +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowArrayReservoir; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.management.JMException; +import javax.management.ObjectName; import org.apache.helix.monitoring.StatCollector; import org.apache.helix.monitoring.StateTransitionContext; import org.apache.helix.monitoring.StateTransitionDataPoint; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // TODO convert StateTransitionStatMonitor to extends DynamicMBeanProvider. // Note this might change the attributes name. -public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean { +public class StateTransitionStatMonitor extends DynamicMBeanProvider { private static final Logger _logger = LoggerFactory.getLogger(StateTransitionStatMonitor.class); - public enum LATENCY_TYPE { - TOTAL, - EXECUTION, - MESSAGE - } + private List<DynamicMetric<?, ?>> _attributeList; + // For registering dynamic metrics + private final ObjectName _initObjectName; - private long _numDataPoints; - private long _successCount; + private SimpleDynamicMetric<Long> _totalStateTransitionCounter; + private SimpleDynamicMetric<Long> _totalFailedTransitionCounter; + private SimpleDynamicMetric<Long> _totalSuccessTransitionCounter; - private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap = new ConcurrentHashMap<>(); + private HistogramDynamicMetric _transitionLatencyGauge; + private HistogramDynamicMetric _transitionExecutionLatencyGauge; + private HistogramDynamicMetric _transitionMessageLatency; StateTransitionContext _context; - public StateTransitionStatMonitor(StateTransitionContext context) { + public StateTransitionStatMonitor(StateTransitionContext context, ObjectName objectName) { _context = context; - for (LATENCY_TYPE type : LATENCY_TYPE.values()) { - _monitorMap.put(type, new StatCollector()); - } - reset(); + _initObjectName = objectName; + _attributeList = new ArrayList<>(); + _totalStateTransitionCounter = new SimpleDynamicMetric<>("TotalStateTransitionCounter", 0L); + _totalFailedTransitionCounter = new SimpleDynamicMetric<>("TotalFailedTransitionCounter", 0L); + _totalSuccessTransitionCounter = new SimpleDynamicMetric<>("TotalSuccessTransitionCounter", 0L); + + _transitionLatencyGauge = new HistogramDynamicMetric("TransitionLatencyGauge", new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _transitionExecutionLatencyGauge = new HistogramDynamicMetric("TransitionExecutionLatencyGauge", + new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _transitionMessageLatency = new HistogramDynamicMetric("TransitionMessageLatencyGauge", + new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + } + + @Override + public DynamicMBeanProvider register() throws JMException { + _attributeList.add(_totalStateTransitionCounter); + _attributeList.add(_totalFailedTransitionCounter); + _attributeList.add(_totalSuccessTransitionCounter); + _attributeList.add(_transitionLatencyGauge); + _attributeList.add(_transitionExecutionLatencyGauge); + _attributeList.add(_transitionMessageLatency); + doRegister(_attributeList, _initObjectName); + return this; } public StateTransitionContext getContext() { @@ -62,105 +95,15 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe } public void addDataPoint(StateTransitionDataPoint data) { - _numDataPoints++; + incrementSimpleDynamicMetric(_totalStateTransitionCounter); if (data.getSuccess()) { - _successCount++; + incrementSimpleDynamicMetric(_totalSuccessTransitionCounter); + } else { + incrementSimpleDynamicMetric(_totalFailedTransitionCounter); } - addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay()); - addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay()); - addLatency(LATENCY_TYPE.MESSAGE, data.getMessageLatency()); - } - private void addLatency(LATENCY_TYPE type, double latency) { - if (latency < 0) { - _logger.warn("Ignore negative latency data {} for type {}.", latency, type.name()); - return; - } - assert(_monitorMap.containsKey(type)); - _monitorMap.get(type).addData(latency); - } - - public void reset() { - _numDataPoints = 0; - _successCount = 0; - for (StatCollector monitor : _monitorMap.values()) { - monitor.reset(); - } - } - - @Override - public long getTotalStateTransitionGauge() { - return _numDataPoints; - } - - @Override - public long getTotalFailedTransitionGauge() { - return _numDataPoints - _successCount; - } - - @Override - public long getTotalSuccessTransitionGauge() { - return _successCount; - } - - @Override - public double getMeanTransitionLatency() { - return _monitorMap.get(LATENCY_TYPE.TOTAL).getMean(); - } - - @Override - public double getMaxTransitionLatency() { - return _monitorMap.get(LATENCY_TYPE.TOTAL).getMax(); - } - - @Override - public double getMinTransitionLatency() { - return _monitorMap.get(LATENCY_TYPE.TOTAL).getMin(); + _transitionLatencyGauge.updateValue(data.getTotalDelay()); + _transitionExecutionLatencyGauge.updateValue(data.getExecutionDelay()); + _transitionMessageLatency.updateValue(data.getMessageLatency()); } - - @Override - public double getPercentileTransitionLatency(int percentage) { - return _monitorMap.get(LATENCY_TYPE.TOTAL).getPercentile(percentage); - } - - @Override - public double getMeanTransitionExecuteLatency() { - return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMean(); - } - - @Override - public double getMaxTransitionExecuteLatency() { - return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMax(); - } - - @Override - public double getMinTransitionExecuteLatency() { - return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMin(); - } - - @Override - public double getPercentileTransitionExecuteLatency(int percentage) { - return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage); - } - - @Override - public double getMeanTransitionMessageLatency() { - return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMean(); - } - - @Override - public double getMaxTransitionMessageLatency() { - return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMax(); - } - - @Override - public double getMinTransitionMessageLatency() { - return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMin(); - } - - @Override - public double getPercentileTransitionMessageLatency(int percentage) { - return _monitorMap.get(LATENCY_TYPE.MESSAGE).getPercentile(percentage); - } - } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java index 4299159..59adf67 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java @@ -210,4 +210,20 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr // No operation supported return null; } + + /** + * NOTE: This method is not thread-safe nor atomic. + * Increment the value of a given SimpleDynamicMetric by 1. + */ + protected void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) { + incrementSimpleDynamicMetric(metric, 1); + } + + /** + * NOTE: This method is not thread-safe nor atomic. + * Increment the value of a given SimpleDynamicMetric with input value. + */ + protected void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long value) { + metric.updateValue(metric.getValue() + value); + } } diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java index d7aed6e..bc9de01 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java @@ -124,13 +124,13 @@ public class TestParticipantMonitor { // Note the values in listener's map is the snapshot when the MBean is detected. Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) - .get("MeanTransitionLatency"), 2000.0); + .get("TransitionLatencyGauge.Mean"), 2000.0); Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) - .get("MeanTransitionExecuteLatency"), 1100.0); + .get("TransitionExecutionLatencyGauge.Mean"), 1100.0); Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) - .get("MeanTransitionMessageLatency"), 600.0); + .get("TransitionMessageLatencyGauge.Mean"), 600.0); Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) - .get("TotalStateTransitionGauge"), 2L); + .get("TotalStateTransitionCounter"), 2L); data = new StateTransitionDataPoint(2000, 500, 600, true); monitor.reportTransitionStat(cxt, data);
