Add a mbean for participant and emit received msgs

Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/821fd04d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/821fd04d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/821fd04d

Branch: refs/heads/helix-0.6.x
Commit: 821fd04dfb1dc66a85d0dfb76e5837a4c8107448
Parents: a1278e1
Author: Boyan Li <b...@linkedin.com>
Authored: Mon Aug 29 09:47:27 2016 -0700
Committer: Lei Xia <l...@linkedin.com>
Committed: Sun Feb 5 19:18:14 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixManager.java     |  6 +++
 .../apache/helix/manager/zk/ZKHelixManager.java | 15 +++++-
 .../messaging/DefaultMessagingService.java      |  2 +-
 .../messaging/handling/HelixTaskExecutor.java   |  8 ++-
 .../mbeans/ParticipantStatusMonitor.java        | 54 ++++++++++++++++++++
 .../mbeans/ParticipantStatusMonitorMBean.java   |  8 +++
 .../src/test/java/org/apache/helix/Mocks.java   |  5 ++
 .../controller/stages/DummyClusterManager.java  |  5 ++
 .../helix/participant/MockZKHelixManager.java   |  5 ++
 9 files changed, 104 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java 
b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 7b574aa..76db004 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -25,6 +25,7 @@ import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.spectator.RoutingTableProvider;
@@ -202,6 +203,11 @@ public interface HelixManager {
   String getSessionId();
 
   /**
+   * Get the ParticipantStatusMonitor.
+   * @return the ParticipantStatusMonitor
+   */
+  ParticipantStatusMonitor getParticipantStatusMonitor();
+  /**
    * The time stamp is always updated when a notification is received. This can
    * be used to check if there was any new notification when previous
    * notification was being processed. This is updated based on the

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index f9d03c3..f15725a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -67,6 +67,7 @@ import 
org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.AutoFallbackPropertyStore;
@@ -123,7 +124,7 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
    */
   private final StateMachineEngine _stateMachineEngine;
   private final List<HelixTimerTask> _timerTasks = new 
ArrayList<HelixTimerTask>();
-
+  private ParticipantStatusMonitor _participantStatusMonitor;
   private final ParticipantHealthReportCollectorImpl 
_participantHealthInfoCollector;
   private Long _sessionStartTime;
 
@@ -200,7 +201,6 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     _version = _properties.getVersion();
 
     _keyBuilder = new Builder(clusterName);
-    _messagingService = new DefaultMessagingService(this);
 
     /**
      * use system property if available
@@ -222,6 +222,7 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     switch (instanceType) {
     case PARTICIPANT:
       _stateMachineEngine = new HelixStateMachineEngine(this);
+      _participantStatusMonitor = new ParticipantStatusMonitor(_instanceName);
       _participantHealthInfoCollector =
           new ParticipantHealthReportCollectorImpl(this, _instanceName);
       _timerTasks.add(new 
ParticipantHealthReportTask(_participantHealthInfoCollector));
@@ -234,6 +235,7 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
       break;
     case CONTROLLER_PARTICIPANT:
       _stateMachineEngine = new HelixStateMachineEngine(this);
+      _participantStatusMonitor = new ParticipantStatusMonitor(_instanceName);
       _participantHealthInfoCollector =
           new ParticipantHealthReportCollectorImpl(this, _instanceName);
 
@@ -248,6 +250,10 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     default:
       throw new IllegalArgumentException("unrecognized type: " + instanceType);
     }
+    // DefaultMessagingService has to be initialized after instance type 
specific init,
+    // because it depends on ParticipantStatusMonitor
+    _messagingService = new DefaultMessagingService(this);
+
   }
 
   private int getSystemPropertyAsInt(String propertyKey, int 
propertyDefaultValue) {
@@ -590,6 +596,11 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
   }
 
   @Override
+  public ParticipantStatusMonitor getParticipantStatusMonitor() {
+    return _participantStatusMonitor;
+  }
+
+  @Override
   public boolean isConnected() {
     if (_zkclient == null) {
       return false;

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index f000f69..1a78cb8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -56,7 +56,7 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
   public DefaultMessagingService(HelixManager manager) {
     _manager = manager;
     _evaluator = new CriteriaEvaluator();
-    _taskExecutor = new HelixTaskExecutor(this);
+    _taskExecutor = new HelixTaskExecutor(this, 
manager.getParticipantStatusMonitor());
     _asyncCallbackService = new AsyncCallbackService();
     
_taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
         _asyncCallbackService);

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 7a1210f..2bb8435 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -59,6 +59,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.monitoring.ParticipantMonitor;
 import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -110,6 +111,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
   private MessageQueueMonitor _messageQueueMonitor;
   private ClusterMessagingService _messagingService;
+  private ParticipantStatusMonitor _participantStatusMonitor;
   private GenericHelixController _controller;
   private Long _lastSessionSyncTime;
   private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
@@ -144,9 +146,10 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
     startMonitorThread();
   }
 
-  public HelixTaskExecutor(ClusterMessagingService messagingService) {
+  public HelixTaskExecutor(ClusterMessagingService messagingService, 
ParticipantStatusMonitor participantStatusMonitor) {
     this();
     _messagingService = messagingService;
+    _participantStatusMonitor = participantStatusMonitor;
   }
 
   @Override
@@ -597,6 +600,9 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
 
     // Update message count
     _messageQueueMonitor.setMessageQueueBacklog(messages.size());
+    if (_participantStatusMonitor != null) {
+      _participantStatusMonitor.incrementReceivedMessages(messages.size());
+    }
 
     // sort message by creation timestamp, so message created earlier is 
processed first
     Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
new file mode 100644
index 0000000..e7c6cab
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -0,0 +1,54 @@
+package org.apache.helix.monitoring.mbeans;
+
+import java.lang.management.ManagementFactory;
+
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+
+public class ParticipantStatusMonitor implements ParticipantStatusMonitorMBean 
{
+  private static final Logger LOG = 
Logger.getLogger(ParticipantStatusMonitor.class);
+  private static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+  private static final String PARTICIPANT_KEY = "ParticipantName";
+  private final MBeanServer _beanServer;
+  private final String _participantName;
+
+  private long _receivedMessages = 0;
+
+  public ParticipantStatusMonitor(String participantName) {
+    _participantName = participantName;
+    _beanServer = ManagementFactory.getPlatformMBeanServer();
+
+    try {
+      LOG.info("Register MBean for participant: " + participantName);
+      _beanServer.registerMBean(this, getObjectName(getParticipantBeanName()));
+    } catch (Exception e) {
+      LOG.error("Could not register MBean for : " + participantName, e);
+    }
+  }
+
+  @Override
+  public long getReceivedMessages() {
+    return _receivedMessages;
+  }
+
+  @Override
+  public String getSensorName() {
+    return PARTICIPANT_STATUS_KEY + "." + _participantName;
+  }
+
+  public ObjectName getObjectName(String name) throws 
MalformedObjectNameException {
+    return new ObjectName(String.format("%s: %s", PARTICIPANT_STATUS_KEY, 
name));
+  }
+
+  private String getParticipantBeanName() {
+    return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
+  }
+
+  public void incrementReceivedMessages(int count) {
+    _receivedMessages+=count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java
new file mode 100644
index 0000000..33f1680
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java
@@ -0,0 +1,8 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+
+public interface ParticipantStatusMonitorMBean extends SensorNameProvider {
+  public long getReceivedMessages();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java 
b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 9688c0d..d705503 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
@@ -462,6 +463,10 @@ public class Mocks {
       return 0L;
     }
 
+    @Override
+    public ParticipantStatusMonitor getParticipantStatusMonitor() {
+      return null;
+    }
   }
 
   public static class MockAccessor implements HelixDataAccessor {

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 0167487..becbb09 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -42,6 +42,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 
@@ -259,4 +260,8 @@ public class DummyClusterManager implements HelixManager {
     return 0L;
   }
 
+  @Override
+  public ParticipantStatusMonitor getParticipantStatusMonitor() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index db6974e..4a97ba5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -48,6 +48,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 
 public class MockZKHelixManager implements HelixManager {
@@ -265,4 +266,8 @@ public class MockZKHelixManager implements HelixManager {
     return 0L;
   }
 
+  @Override
+  public ParticipantStatusMonitor getParticipantStatusMonitor() {
+    return null;
+  }
 }

Reply via email to