This is an automated email from the ASF dual-hosted git repository.

zhangmeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new d5f5273  Change participant message monitor to use dynamic metric 
(#1685)
d5f5273 is described below

commit d5f5273d483ba54c51c79d497dead190cf758bb6
Author: Meng Zhang <[email protected]>
AuthorDate: Thu Mar 25 16:49:48 2021 -0700

    Change participant message monitor to use dynamic metric (#1685)
---
 .../mbeans/ParticipantMessageMonitor.java          | 91 ++++++++++++----------
 .../mbeans/ParticipantMessageMonitorMBean.java     |  3 +-
 .../mbeans/ParticipantStatusMonitor.java           |  3 +-
 3 files changed, 53 insertions(+), 44 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index 261790d..9c90295 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -19,10 +19,29 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-public class ParticipantMessageMonitor implements 
ParticipantMessageMonitorMBean {
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+
+public class ParticipantMessageMonitor extends DynamicMBeanProvider {
   public static final String PARTICIPANT_KEY = "ParticipantName";
   public static final String PARTICIPANT_STATUS_KEY = 
"ParticipantMessageStatus";
 
+  // For registering dynamic metrics
+  private final ObjectName _initObjectName;
+  private final String _participantName;
+
+  private SimpleDynamicMetric<Long> _receivedMessages;
+  private SimpleDynamicMetric<Long> _discardedMessages;
+  private SimpleDynamicMetric<Long> _completedMessages;
+  private SimpleDynamicMetric<Long> _failedMessages;
+  private SimpleDynamicMetric<Long> _pendingMessages;
+
   /**
    * The current processed state of the message
    */
@@ -32,68 +51,42 @@ public class ParticipantMessageMonitor implements 
ParticipantMessageMonitorMBean
     COMPLETED
   }
 
-  private final String _participantName;
-  private long _receivedMessages = 0;
-  private long _discardedMessages = 0;
-  private long _completedMessages = 0;
-  private long _failedMessages = 0;
-  private long _pendingMessages = 0;
-
-  public ParticipantMessageMonitor(String participantName) {
+  public ParticipantMessageMonitor(String participantName, ObjectName 
objectName) {
     _participantName = participantName;
+    _initObjectName = objectName;
+    _receivedMessages = new SimpleDynamicMetric("ReceivedMessages", 0L);
+    _discardedMessages = new SimpleDynamicMetric("DiscardedMessages", 0L);
+    _completedMessages = new SimpleDynamicMetric("CompletedMessages", 0L);
+    _failedMessages = new SimpleDynamicMetric("FailedMessages", 0L);
+    _pendingMessages = new SimpleDynamicMetric("PendingMessages", 0L);
   }
 
   public String getParticipantBeanName() {
     return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
   }
 
-  public void incrementReceivedMessages(int count) {
-    _receivedMessages += count;
+  public void incrementReceivedMessages(long count) {
+    incrementSimpleDynamicMetric(_receivedMessages, count);
   }
 
   public void incrementDiscardedMessages(int count) {
-    _discardedMessages += count;
+    incrementSimpleDynamicMetric(_discardedMessages, count);
   }
 
   public void incrementCompletedMessages(int count) {
-    _completedMessages += count;
+    incrementSimpleDynamicMetric(_completedMessages, count);
   }
 
   public void incrementFailedMessages(int count) {
-    _failedMessages += count;
+    incrementSimpleDynamicMetric(_failedMessages, count);
   }
 
   public void incrementPendingMessages(int count) {
-    _pendingMessages += count;
+    incrementSimpleDynamicMetric(_pendingMessages, count);
   }
 
   public void decrementPendingMessages(int count) {
-    _pendingMessages -= count;
-  }
-
-  @Override
-  public long getReceivedMessages() {
-    return _receivedMessages;
-  }
-
-  @Override
-  public long getDiscardedMessages() {
-    return _discardedMessages;
-  }
-
-  @Override
-  public long getCompletedMessages() {
-    return _completedMessages;
-  }
-
-  @Override
-  public long getFailedMessages() {
-    return _failedMessages;
-  }
-
-  @Override
-  public long getPendingMessages() {
-    return _pendingMessages;
+    incrementSimpleDynamicMetric(_pendingMessages, -1 * count);
   }
 
   @Override
@@ -101,4 +94,20 @@ public class ParticipantMessageMonitor implements 
ParticipantMessageMonitorMBean
     return PARTICIPANT_STATUS_KEY;
   }
 
+  /**
+   * This method registers the dynamic metrics.
+   * @return
+   * @throws JMException
+   */
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_receivedMessages);
+    attributeList.add(_discardedMessages);
+    attributeList.add(_completedMessages);
+    attributeList.add(_failedMessages);
+    attributeList.add(_pendingMessages);
+    doRegister(attributeList, _initObjectName);
+    return this;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
index d4a899f..f806d1b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
@@ -8,7 +8,6 @@ package org.apache.helix.monitoring.mbeans;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
@@ -21,7 +20,7 @@ package org.apache.helix.monitoring.mbeans;
 
 import org.apache.helix.monitoring.SensorNameProvider;
 
-
+@Deprecated
 public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
   public long getReceivedMessages();
   public long getDiscardedMessages();
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index 6e5b346..5b91cff 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -49,7 +49,8 @@ public class ParticipantStatusMonitor {
     try {
       _beanServer = ManagementFactory.getPlatformMBeanServer();
       if (isParticipant) {
-        _messageMonitor = new ParticipantMessageMonitor(instanceName);
+        _messageMonitor =
+            new ParticipantMessageMonitor(instanceName, 
getObjectName(_messageMonitor.getParticipantBeanName()));
         _messageLatencyMonitor =
             new 
MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), 
instanceName);
         _messageLatencyMonitor.register();

Reply via email to