This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit fa5767f2f4a83c05091e48a25a7a45ba947bc2f8 Author: Jiajun Wang <[email protected]> AuthorDate: Fri Feb 22 15:36:46 2019 -0800 Add message latency record to StateTransitionStatMonitor. This record provides with additional breakdown to understand the state transition delay. RB=1573606 BUG=HELIX-1625 G=helix-reviewers A=jxue Signed-off-by: Hunter Lee <[email protected]> --- .../apache/helix/messaging/handling/HelixTask.java | 11 +++- .../helix/monitoring/StateTransitionDataPoint.java | 9 ++- .../monitoring/mbeans/ClusterMBeanObserver.java | 3 +- .../mbeans/ParticipantStatusMonitor.java | 4 +- .../mbeans/StateTransitionStatMonitor.java | 51 +++++++++++---- .../mbeans/StateTransitionStatMonitorMBean.java | 8 +++ .../helix/monitoring/TestParticipantMonitor.java | 72 ++++++++++++++-------- 7 files changed, 111 insertions(+), 47 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index fb55e76..a8b74e5 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -22,6 +22,7 @@ package org.apache.helix.messaging.handling; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; + import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -317,7 +318,8 @@ public class HelixTask implements MessageTask { if (msgReadTime != 0 && msgExecutionStartTime != 0) { long totalDelay = now - msgReadTime; long executionDelay = now - msgExecutionStartTime; - if (totalDelay > 0 && executionDelay > 0) { + long msgLatency = msgReadTime - message.getCreateTimeStamp(); + if (totalDelay >= 0 && executionDelay >= 0) { String fromState = message.getFromState(); String toState = message.getToState(); String transition = fromState + "--" + toState; @@ -327,11 +329,14 @@ public class HelixTask implements MessageTask { message.getResourceName(), transition); StateTransitionDataPoint data = - new StateTransitionDataPoint(totalDelay, executionDelay, taskResult.isSuccess()); + new StateTransitionDataPoint(totalDelay, executionDelay, msgLatency, + taskResult.isSuccess()); _executor.getParticipantMonitor().reportTransitionStat(cxt, data); } } else { - logger.warn("message read time and start execution time not recorded."); + logger.warn( + "message read time and start execution time not recorded. State transition delay time is not available, message read time {}, Execute start time {}.", + msgReadTime, msgExecutionStartTime); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java index 1fe9264..0b4a675 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java @@ -22,11 +22,14 @@ package org.apache.helix.monitoring; public class StateTransitionDataPoint { long _totalDelay; long _executionDelay; + long _messageLatency; boolean _isSuccess; - public StateTransitionDataPoint(long totalDelay, long executionDelay, boolean isSuccess) { + public StateTransitionDataPoint(long totalDelay, long executionDelay, long messageLatency, + boolean isSuccess) { _totalDelay = totalDelay; _executionDelay = executionDelay; + _messageLatency = messageLatency; _isSuccess = isSuccess; } @@ -38,6 +41,10 @@ public class StateTransitionDataPoint { return _executionDelay; } + public long getMessageLatency() { + return _messageLatency; + } + public boolean getSuccess() { return _isSuccess; } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java index c9735c5..3320fff 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java @@ -43,8 +43,7 @@ public abstract class ClusterMBeanObserver implements NotificationListener { protected MBeanServerConnection _server; private static final Logger _logger = LoggerFactory.getLogger(ClusterMBeanObserver.class); - public ClusterMBeanObserver(String domain) throws InstanceNotFoundException, IOException, - MalformedObjectNameException, NullPointerException { + public ClusterMBeanObserver(String domain) throws IOException, InstanceNotFoundException { // Get a reference to the target MBeanServer _domain = domain; _server = ManagementFactory.getPlatformMBeanServer(); diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java index d41b402..9eb29ff 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java @@ -37,7 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor; public class ParticipantStatusMonitor { private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap = - new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>(); + new ConcurrentHashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(ParticipantStatusMonitor.class); private MBeanServer _beanServer; @@ -116,11 +116,11 @@ public class ParticipantStatusMonitor { } private ObjectName getObjectName(String name) throws MalformedObjectNameException { - LOG.info("Registering bean: " + name); return new ObjectName(String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name)); } private void register(Object bean, ObjectName name) { + LOG.info("Registering bean: " + name.toString()); if (_beanServer == null) { LOG.warn("bean server is null, skip reporting"); return; diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java index 4a4c89e..b1e93e6 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java @@ -20,30 +20,35 @@ package org.apache.helix.monitoring.mbeans; */ import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import org.apache.helix.monitoring.StatCollector; import org.apache.helix.monitoring.StateTransitionContext; import org.apache.helix.monitoring.StateTransitionDataPoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +// TODO convert StateTransitionStatMonitor to extends DynamicMBeanProvider. +// Note this might change the attributes name. public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean { + private static final Logger _logger = LoggerFactory.getLogger(StateTransitionStatMonitor.class); public enum LATENCY_TYPE { TOTAL, - EXECUTION + EXECUTION, + MESSAGE } private long _numDataPoints; private long _successCount; - private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap = - new ConcurrentHashMap<LATENCY_TYPE, StatCollector>(); + private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap = new ConcurrentHashMap<>(); StateTransitionContext _context; public StateTransitionStatMonitor(StateTransitionContext context) { _context = context; - _monitorMap.put(LATENCY_TYPE.TOTAL, new StatCollector()); - _monitorMap.put(LATENCY_TYPE.EXECUTION, new StatCollector()); + for (LATENCY_TYPE type : LATENCY_TYPE.values()) { + _monitorMap.put(type, new StatCollector()); + } reset(); } @@ -63,17 +68,18 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe } addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay()); addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay()); + addLatency(LATENCY_TYPE.MESSAGE, data.getMessageLatency()); } - void addLatency(LATENCY_TYPE type, double latency) { - assert (_monitorMap.containsKey(type)); + private void addLatency(LATENCY_TYPE type, double latency) { + if (latency < 0) { + _logger.warn("Ignore negative latency data {} for type {}.", latency, type.name()); + return; + } + assert(_monitorMap.containsKey(type)); _monitorMap.get(type).addData(latency); } - public long getNumDataPoints() { - return _numDataPoints; - } - public void reset() { _numDataPoints = 0; _successCount = 0; @@ -136,4 +142,25 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe public double getPercentileTransitionExecuteLatency(int percentage) { return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage); } + + @Override + public double getMeanTransitionMessageLatency() { + return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMean(); + } + + @Override + public double getMaxTransitionMessageLatency() { + return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMax(); + } + + @Override + public double getMinTransitionMessageLatency() { + return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMin(); + } + + @Override + public double getPercentileTransitionMessageLatency(int percentage) { + return _monitorMap.get(LATENCY_TYPE.MESSAGE).getPercentile(percentage); + } + } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java index 60d6bc4..fe53344 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java @@ -44,5 +44,13 @@ public interface StateTransitionStatMonitorMBean extends SensorNameProvider { double getPercentileTransitionExecuteLatency(int percentage); + double getMeanTransitionMessageLatency(); + + double getMaxTransitionMessageLatency(); + + double getMinTransitionMessageLatency(); + + double getPercentileTransitionMessageLatency(int percentage); + void reset(); } diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java index b6e88b3..d7aed6e 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java @@ -19,10 +19,6 @@ package org.apache.helix.monitoring; * under the License. */ -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; import javax.management.InstanceNotFoundException; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; @@ -31,21 +27,28 @@ import javax.management.MBeanServerNotification; import javax.management.MalformedObjectNameException; import javax.management.ObjectInstance; import javax.management.ObjectName; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.TestHelper; import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.AssertJUnit; +import org.testng.Assert; import org.testng.annotations.Test; public class TestParticipantMonitor { - static Logger _logger = LoggerFactory.getLogger(TestParticipantMonitor.class); + private static Logger _logger = LoggerFactory.getLogger(TestParticipantMonitor.class); + private static String CLUSTER_NAME = TestHelper.getTestClassName(); class ParticipantMonitorListener extends ClusterMBeanObserver { - Map<String, Map<String, Object>> _beanValueMap = new HashMap<String, Map<String, Object>>(); + Map<String, Map<String, Object>> _beanValueMap = new HashMap<>(); - public ParticipantMonitorListener(String domain) throws InstanceNotFoundException, IOException, - MalformedObjectNameException, NullPointerException { + public ParticipantMonitorListener(String domain) throws IOException, InstanceNotFoundException { super(domain); init(); } @@ -53,7 +56,7 @@ public class TestParticipantMonitor { void init() { try { Set<ObjectInstance> existingInstances = - _server.queryMBeans(new ObjectName(_domain + ":Cluster=cluster,*"), null); + _server.queryMBeans(new ObjectName(_domain + ":Cluster=" + CLUSTER_NAME + ",*"), null); for (ObjectInstance instance : existingInstances) { String mbeanName = instance.getObjectName().toString(); // System.out.println("mbeanName: " + mbeanName); @@ -93,42 +96,57 @@ public class TestParticipantMonitor { } } + private ObjectName getObjectName(String name) throws MalformedObjectNameException { + return new ObjectName( + String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name)); + } + @Test() - public void testReportData() throws InstanceNotFoundException, MalformedObjectNameException, - NullPointerException, IOException, InterruptedException { + public void testReportData() + throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, + IOException, InterruptedException { System.out.println("START TestParticipantMonitor"); ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(false, null); int monitorNum = 0; - StateTransitionContext cxt = new StateTransitionContext("cluster", "instance", "db_1", "a-b"); - StateTransitionDataPoint data = new StateTransitionDataPoint(1000, 1000, true); + StateTransitionContext cxt = new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b"); + StateTransitionDataPoint data = new StateTransitionDataPoint(2000, 1000, 600, true); monitor.reportTransitionStat(cxt, data); - data = new StateTransitionDataPoint(1000, 1200, true); + data = new StateTransitionDataPoint(2000, 1200, 600, true); monitor.reportTransitionStat(cxt, data); ParticipantMonitorListener monitorListener = new ParticipantMonitorListener("CLMParticipantReport"); Thread.sleep(1000); - AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1); - - data = new StateTransitionDataPoint(1000, 500, true); + Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1); + + // Note the values in listener's map is the snapshot when the MBean is detected. + Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) + .get("MeanTransitionLatency"), 2000.0); + Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) + .get("MeanTransitionExecuteLatency"), 1100.0); + Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) + .get("MeanTransitionMessageLatency"), 600.0); + Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString()) + .get("TotalStateTransitionGauge"), 2L); + + data = new StateTransitionDataPoint(2000, 500, 600, true); monitor.reportTransitionStat(cxt, data); Thread.sleep(1000); - AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1); + Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1); - data = new StateTransitionDataPoint(1000, 500, true); - StateTransitionContext cxt2 = new StateTransitionContext("cluster", "instance", "db_2", "a-b"); + data = new StateTransitionDataPoint(1000, 500, 300, true); + StateTransitionContext cxt2 = new StateTransitionContext(CLUSTER_NAME, "instance", "db_2", "a-b"); monitor.reportTransitionStat(cxt2, data); monitor.reportTransitionStat(cxt2, data); Thread.sleep(1000); - AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 2); + Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 2); - AssertJUnit.assertFalse(cxt.equals(cxt2)); - AssertJUnit.assertFalse(cxt.equals(new Object())); - AssertJUnit.assertTrue(cxt.equals(new StateTransitionContext("cluster", "instance", "db_1", - "a-b"))); + Assert.assertFalse(cxt.equals(cxt2)); + Assert.assertFalse(cxt.equals(new Object())); + Assert.assertTrue(cxt.equals(new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b"))); cxt2.getInstanceName(); @@ -136,7 +154,7 @@ public class TestParticipantMonitor { new ParticipantMonitorListener("CLMParticipantReport"); Thread.sleep(1000); - AssertJUnit.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 2); + Assert.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 2); monitorListener2.disconnect(); monitorListener.disconnect();
