Add a mbean for participant and emit received msgs
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/821fd04d Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/821fd04d Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/821fd04d Branch: refs/heads/helix-0.6.x Commit: 821fd04dfb1dc66a85d0dfb76e5837a4c8107448 Parents: a1278e1 Author: Boyan Li <b...@linkedin.com> Authored: Mon Aug 29 09:47:27 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Sun Feb 5 19:18:14 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/HelixManager.java | 6 +++ .../apache/helix/manager/zk/ZKHelixManager.java | 15 +++++- .../messaging/DefaultMessagingService.java | 2 +- .../messaging/handling/HelixTaskExecutor.java | 8 ++- .../mbeans/ParticipantStatusMonitor.java | 54 ++++++++++++++++++++ .../mbeans/ParticipantStatusMonitorMBean.java | 8 +++ .../src/test/java/org/apache/helix/Mocks.java | 5 ++ .../controller/stages/DummyClusterManager.java | 5 ++ .../helix/participant/MockZKHelixManager.java | 5 ++ 9 files changed, 104 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/HelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java index 7b574aa..76db004 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java @@ -25,6 +25,7 @@ import org.apache.helix.controller.GenericHelixController; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.spectator.RoutingTableProvider; @@ -202,6 +203,11 @@ public interface HelixManager { String getSessionId(); /** + * Get the ParticipantStatusMonitor. + * @return the ParticipantStatusMonitor + */ + ParticipantStatusMonitor getParticipantStatusMonitor(); + /** * The time stamp is always updated when a notification is received. This can * be used to check if there was any new notification when previous * notification was being processed. This is updated based on the http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index f9d03c3..f15725a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -67,6 +67,7 @@ import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.monitoring.ZKPathDataDumpTask; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.store.zk.AutoFallbackPropertyStore; @@ -123,7 +124,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { */ private final StateMachineEngine _stateMachineEngine; private final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>(); - + private ParticipantStatusMonitor _participantStatusMonitor; private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector; private Long _sessionStartTime; @@ -200,7 +201,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { _version = _properties.getVersion(); _keyBuilder = new Builder(clusterName); - _messagingService = new DefaultMessagingService(this); /** * use system property if available @@ -222,6 +222,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { switch (instanceType) { case PARTICIPANT: _stateMachineEngine = new HelixStateMachineEngine(this); + _participantStatusMonitor = new ParticipantStatusMonitor(_instanceName); _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector)); @@ -234,6 +235,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { break; case CONTROLLER_PARTICIPANT: _stateMachineEngine = new HelixStateMachineEngine(this); + _participantStatusMonitor = new ParticipantStatusMonitor(_instanceName); _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); @@ -248,6 +250,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { default: throw new IllegalArgumentException("unrecognized type: " + instanceType); } + // DefaultMessagingService has to be initialized after instance type specific init, + // because it depends on ParticipantStatusMonitor + _messagingService = new DefaultMessagingService(this); + } private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) { @@ -590,6 +596,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } @Override + public ParticipantStatusMonitor getParticipantStatusMonitor() { + return _participantStatusMonitor; + } + + @Override public boolean isConnected() { if (_zkclient == null) { return false; http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java index f000f69..1a78cb8 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java @@ -56,7 +56,7 @@ public class DefaultMessagingService implements ClusterMessagingService { public DefaultMessagingService(HelixManager manager) { _manager = manager; _evaluator = new CriteriaEvaluator(); - _taskExecutor = new HelixTaskExecutor(this); + _taskExecutor = new HelixTaskExecutor(this, manager.getParticipantStatusMonitor()); _asyncCallbackService = new AsyncCallbackService(); _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(), _asyncCallbackService); http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 7a1210f..2bb8435 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -59,6 +59,7 @@ import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.monitoring.ParticipantMonitor; import org.apache.helix.monitoring.mbeans.MessageQueueMonitor; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; @@ -110,6 +111,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { private MessageQueueMonitor _messageQueueMonitor; private ClusterMessagingService _messagingService; + private ParticipantStatusMonitor _participantStatusMonitor; private GenericHelixController _controller; private Long _lastSessionSyncTime; private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds @@ -144,9 +146,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { startMonitorThread(); } - public HelixTaskExecutor(ClusterMessagingService messagingService) { + public HelixTaskExecutor(ClusterMessagingService messagingService, ParticipantStatusMonitor participantStatusMonitor) { this(); _messagingService = messagingService; + _participantStatusMonitor = participantStatusMonitor; } @Override @@ -597,6 +600,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // Update message count _messageQueueMonitor.setMessageQueueBacklog(messages.size()); + if (_participantStatusMonitor != null) { + _participantStatusMonitor.incrementReceivedMessages(messages.size()); + } // sort message by creation timestamp, so message created earlier is processed first Collections.sort(messages, Message.CREATE_TIME_COMPARATOR); http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java new file mode 100644 index 0000000..e7c6cab --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java @@ -0,0 +1,54 @@ +package org.apache.helix.monitoring.mbeans; + +import java.lang.management.ManagementFactory; + +import org.apache.log4j.Logger; + +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + + +public class ParticipantStatusMonitor implements ParticipantStatusMonitorMBean { + private static final Logger LOG = Logger.getLogger(ParticipantStatusMonitor.class); + private static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus"; + private static final String PARTICIPANT_KEY = "ParticipantName"; + private final MBeanServer _beanServer; + private final String _participantName; + + private long _receivedMessages = 0; + + public ParticipantStatusMonitor(String participantName) { + _participantName = participantName; + _beanServer = ManagementFactory.getPlatformMBeanServer(); + + try { + LOG.info("Register MBean for participant: " + participantName); + _beanServer.registerMBean(this, getObjectName(getParticipantBeanName())); + } catch (Exception e) { + LOG.error("Could not register MBean for : " + participantName, e); + } + } + + @Override + public long getReceivedMessages() { + return _receivedMessages; + } + + @Override + public String getSensorName() { + return PARTICIPANT_STATUS_KEY + "." + _participantName; + } + + public ObjectName getObjectName(String name) throws MalformedObjectNameException { + return new ObjectName(String.format("%s: %s", PARTICIPANT_STATUS_KEY, name)); + } + + private String getParticipantBeanName() { + return String.format("%s=%s", PARTICIPANT_KEY, _participantName); + } + + public void incrementReceivedMessages(int count) { + _receivedMessages+=count; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java new file mode 100644 index 0000000..33f1680 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java @@ -0,0 +1,8 @@ +package org.apache.helix.monitoring.mbeans; + +import org.apache.helix.monitoring.SensorNameProvider; + + +public interface ParticipantStatusMonitorMBean extends SensorNameProvider { + public long getReceivedMessages(); +} http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/Mocks.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java index 9688c0d..d705503 100644 --- a/helix-core/src/test/java/org/apache/helix/Mocks.java +++ b/helix-core/src/test/java/org/apache/helix/Mocks.java @@ -43,6 +43,7 @@ import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.PauseSignal; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelInfo; @@ -462,6 +463,10 @@ public class Mocks { return 0L; } + @Override + public ParticipantStatusMonitor getParticipantStatusMonitor() { + return null; + } } public static class MockAccessor implements HelixDataAccessor { http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java index 0167487..becbb09 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java @@ -42,6 +42,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.controller.GenericHelixController; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -259,4 +260,8 @@ public class DummyClusterManager implements HelixManager { return 0L; } + @Override + public ParticipantStatusMonitor getParticipantStatusMonitor() { + return null; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java index db6974e..4a97ba5 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java +++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java @@ -48,6 +48,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.messaging.DefaultMessagingService; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; +import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor; import org.apache.helix.store.zk.ZkHelixPropertyStore; public class MockZKHelixManager implements HelixManager { @@ -265,4 +266,8 @@ public class MockZKHelixManager implements HelixManager { return 0L; } + @Override + public ParticipantStatusMonitor getParticipantStatusMonitor() { + return null; + } }