This is an automated email from the ASF dual-hosted git repository.
zhangmeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 24c3c24 Change participant message monitor from static metric to
dynamic metric (#1696)
24c3c24 is described below
commit 24c3c24b35ce39784343573c9561e3d808adc1c6
Author: Meng Zhang <[email protected]>
AuthorDate: Fri Apr 16 09:07:52 2021 -0700
Change participant message monitor from static metric to dynamic metric
(#1696)
Co-authored-by: Meng Zhang <[email protected]>
---
.../monitoring/mbeans/MessageLatencyMonitor.java | 2 +-
.../mbeans/ParticipantMessageMonitor.java | 97 ++++++++++---------
.../mbeans/ParticipantMessageMonitorMBean.java | 31 ------
.../mbeans/ParticipantStatusMonitor.java | 62 +++++-------
.../helix/monitoring/TestParticipantMonitor.java | 107 ++++++++++++++++++---
5 files changed, 172 insertions(+), 127 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
index 9f2e41c..c180953 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
@@ -74,7 +74,7 @@ public class MessageLatencyMonitor extends
DynamicMBeanProvider {
attributeList.add(_totalMessageCount);
attributeList.add(_totalMessageLatency);
attributeList.add(_messageLatencyGauge);
- doRegister(attributeList, MBEAN_DESCRIPTION, _domainName,
ParticipantMessageMonitor.PARTICIPANT_KEY,
+ doRegister(attributeList, MBEAN_DESCRIPTION, _domainName,
ParticipantStatusMonitor.PARTICIPANT_KEY,
_participantName, "MonitorType",
MessageLatencyMonitor.class.getSimpleName());
return this;
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index 261790d..83aa3e2 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -19,10 +19,27 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
-public class ParticipantMessageMonitor implements
ParticipantMessageMonitorMBean {
- public static final String PARTICIPANT_KEY = "ParticipantName";
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.JMException;
+
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+public class ParticipantMessageMonitor extends DynamicMBeanProvider {
+ private static final String MBEAN_DESCRIPTION = "Helix Participant Message
Monitor";
+ private final String _domainName;
public static final String PARTICIPANT_STATUS_KEY =
"ParticipantMessageStatus";
+ private final String _participantName;
+
+ private SimpleDynamicMetric<Long> _receivedMessages;
+ private SimpleDynamicMetric<Long> _discardedMessages;
+ private SimpleDynamicMetric<Long> _completedMessages;
+ private SimpleDynamicMetric<Long> _failedMessages;
+ private SimpleDynamicMetric<Long> _pendingMessages;
+
/**
* The current processed state of the message
*/
@@ -32,68 +49,38 @@ public class ParticipantMessageMonitor implements
ParticipantMessageMonitorMBean
COMPLETED
}
- private final String _participantName;
- private long _receivedMessages = 0;
- private long _discardedMessages = 0;
- private long _completedMessages = 0;
- private long _failedMessages = 0;
- private long _pendingMessages = 0;
-
- public ParticipantMessageMonitor(String participantName) {
+ public ParticipantMessageMonitor(String domainName, String participantName) {
+ _domainName = domainName;
_participantName = participantName;
+ _receivedMessages = new SimpleDynamicMetric("ReceivedMessages", 0L);
+ _discardedMessages = new SimpleDynamicMetric("DiscardedMessages", 0L);
+ _completedMessages = new SimpleDynamicMetric("CompletedMessages", 0L);
+ _failedMessages = new SimpleDynamicMetric("FailedMessages", 0L);
+ _pendingMessages = new SimpleDynamicMetric("PendingMessages", 0L);
}
- public String getParticipantBeanName() {
- return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
- }
-
- public void incrementReceivedMessages(int count) {
- _receivedMessages += count;
+ public void incrementReceivedMessages(long count) {
+ incrementSimpleDynamicMetric(_receivedMessages, count);
}
public void incrementDiscardedMessages(int count) {
- _discardedMessages += count;
+ incrementSimpleDynamicMetric(_discardedMessages, count);
}
public void incrementCompletedMessages(int count) {
- _completedMessages += count;
+ incrementSimpleDynamicMetric(_completedMessages, count);
}
public void incrementFailedMessages(int count) {
- _failedMessages += count;
+ incrementSimpleDynamicMetric(_failedMessages, count);
}
public void incrementPendingMessages(int count) {
- _pendingMessages += count;
+ incrementSimpleDynamicMetric(_pendingMessages, count);
}
public void decrementPendingMessages(int count) {
- _pendingMessages -= count;
- }
-
- @Override
- public long getReceivedMessages() {
- return _receivedMessages;
- }
-
- @Override
- public long getDiscardedMessages() {
- return _discardedMessages;
- }
-
- @Override
- public long getCompletedMessages() {
- return _completedMessages;
- }
-
- @Override
- public long getFailedMessages() {
- return _failedMessages;
- }
-
- @Override
- public long getPendingMessages() {
- return _pendingMessages;
+ incrementSimpleDynamicMetric(_pendingMessages, -1 * count);
}
@Override
@@ -101,4 +88,22 @@ public class ParticipantMessageMonitor implements
ParticipantMessageMonitorMBean
return PARTICIPANT_STATUS_KEY;
}
+ /**
+ * This method registers the dynamic metrics.
+ * @return
+ * @throws JMException
+ */
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+ attributeList.add(_receivedMessages);
+ attributeList.add(_discardedMessages);
+ attributeList.add(_completedMessages);
+ attributeList.add(_failedMessages);
+ attributeList.add(_pendingMessages);
+ doRegister(attributeList, MBEAN_DESCRIPTION, _domainName,
+ ParticipantStatusMonitor.PARTICIPANT_KEY, _participantName,
"MonitorType",
+ ParticipantMessageMonitor.class.getSimpleName());
+ return this;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
deleted file mode 100644
index d4a899f..0000000
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-
-public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
- public long getReceivedMessages();
- public long getDiscardedMessages();
- public long getCompletedMessages();
- public long getFailedMessages();
- public long getPendingMessages();
-}
\ No newline at end of file
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index 6e5b346..14db1c7 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -36,25 +36,32 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ParticipantStatusMonitor {
+ public static final String PARTICIPANT_KEY = "ParticipantName";
+
private final ConcurrentHashMap<StateTransitionContext,
StateTransitionStatMonitor> _monitorMap =
new ConcurrentHashMap<>();
private static final Logger LOG =
LoggerFactory.getLogger(ParticipantStatusMonitor.class);
+ private final String _instanceName;
private MBeanServer _beanServer;
private ParticipantMessageMonitor _messageMonitor;
private MessageLatencyMonitor _messageLatencyMonitor;
private Map<String, ThreadPoolExecutorMonitor> _executorMonitors;
public ParticipantStatusMonitor(boolean isParticipant, String instanceName) {
+ _instanceName = instanceName;
try {
_beanServer = ManagementFactory.getPlatformMBeanServer();
if (isParticipant) {
- _messageMonitor = new ParticipantMessageMonitor(instanceName);
+ _messageMonitor =
+ new
ParticipantMessageMonitor(MonitorDomainNames.CLMParticipantReport.name(),
+ _instanceName);
+ _messageMonitor.register();
_messageLatencyMonitor =
- new
MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(),
instanceName);
+ new
MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(),
+ _instanceName);
_messageLatencyMonitor.register();
_executorMonitors = new ConcurrentHashMap<>();
- register(_messageMonitor,
getObjectName(_messageMonitor.getParticipantBeanName()));
}
} catch (Exception e) {
LOG.warn(e.toString());
@@ -115,42 +122,26 @@ public class ParticipantStatusMonitor {
}
private ObjectName getObjectName(String name) throws
MalformedObjectNameException {
- return new ObjectName(String.format("%s:%s",
MonitorDomainNames.CLMParticipantReport.name(), name));
+ return new ObjectName(
+ String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(),
name));
}
- private void register(Object bean, ObjectName name) {
- LOG.info("Registering bean: " + name.toString());
- if (_beanServer == null) {
- LOG.warn("bean server is null, skip reporting");
- return;
- }
- try {
- _beanServer.unregisterMBean(name);
- } catch (Exception e1) {
- // Swallow silently
- }
-
- try {
- _beanServer.registerMBean(bean, name);
- } catch (Exception e) {
- LOG.warn("Could not register MBean", e);
- }
+ /**
+ * Build participant bean name
+ * @param participantName
+ * @return participant bean name
+ */
+ protected String getParticipantBeanName(String participantName) {
+ return String.format("%s=%s", PARTICIPANT_KEY, participantName);
}
public void shutDown() {
- if (_messageMonitor != null) { // is participant
- try {
- ObjectName name =
getObjectName(_messageMonitor.getParticipantBeanName());
- if (_beanServer.isRegistered(name)) {
- _beanServer.unregisterMBean(name);
- }
- } catch (Exception e) {
- LOG.warn("fail to unregister " +
_messageMonitor.getParticipantBeanName(), e);
- }
- }
if (_messageLatencyMonitor != null) {
_messageLatencyMonitor.unregister();
}
+ if (_messageMonitor != null) {
+ _messageMonitor.unregister();
+ }
for (StateTransitionContext cxt : _monitorMap.keySet()) {
try {
ObjectName name = getObjectName(cxt.toString());
@@ -168,16 +159,15 @@ public class ParticipantStatusMonitor {
if (_executorMonitors == null) {
return;
}
- if (! (executor instanceof ThreadPoolExecutor)) {
+ if (!(executor instanceof ThreadPoolExecutor)) {
return;
}
try {
- _executorMonitors.put(type,
- new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor) executor));
+ _executorMonitors
+ .put(type, new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor)
executor));
} catch (JMException e) {
- LOG.warn(String.format(
- "Error in creating ThreadPoolExecutorMonitor for type=%s", type), e);
+ LOG.warn(String.format("Error in creating ThreadPoolExecutorMonitor for
type=%s", type), e);
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index f137df3..b761573 100644
---
a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -23,18 +23,23 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerNotification;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
+import javax.management.ReflectionException;
import org.apache.helix.TestHelper;
+import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,19 +49,22 @@ import org.testng.annotations.Test;
public class TestParticipantMonitor {
private static Logger _logger =
LoggerFactory.getLogger(TestParticipantMonitor.class);
private static String CLUSTER_NAME = TestHelper.getTestClassName();
+ private static final String PARTICIPANT_NAME = "participant_0";
+ private static final String DOMAIN_NAME = "CLMParticipantReport";
class ParticipantMonitorListener extends ClusterMBeanObserver {
Map<String, Map<String, Object>> _beanValueMap = new HashMap<>();
- public ParticipantMonitorListener(String domain) throws IOException,
InstanceNotFoundException {
+ public ParticipantMonitorListener(String domain, String key, String value)
+ throws IOException, InstanceNotFoundException {
super(domain);
- init();
+ init(key, value);
}
- void init() {
+ void init(String key, String value) {
try {
Set<ObjectInstance> existingInstances =
- _server.queryMBeans(new ObjectName(_domain + ":Cluster=" +
CLUSTER_NAME + ",*"), null);
+ _server.queryMBeans(new ObjectName(_domain + ":" + key + "=" +
value + ",*"), null);
for (ObjectInstance instance : existingInstances) {
String mbeanName = instance.getObjectName().toString();
// System.out.println("mbeanName: " + mbeanName);
@@ -102,15 +110,17 @@ public class TestParticipantMonitor {
}
@Test()
- public void testReportData()
+ public void testReportStateTransitionData()
throws InstanceNotFoundException, MalformedObjectNameException,
NullPointerException,
- IOException, InterruptedException {
- System.out.println("START TestParticipantMonitor");
+ IOException, InterruptedException, MBeanException,
AttributeNotFoundException,
+ ReflectionException {
+ System.out.println("START TestParticipantStateTransitionMonitor");
ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(false,
null);
int monitorNum = 0;
- StateTransitionContext cxt = new StateTransitionContext(CLUSTER_NAME,
"instance", "db_1", "a-b");
+ StateTransitionContext cxt =
+ new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b");
StateTransitionDataPoint data = new StateTransitionDataPoint(2000, 1000,
600, true);
monitor.reportTransitionStat(cxt, data);
@@ -118,7 +128,7 @@ public class TestParticipantMonitor {
monitor.reportTransitionStat(cxt, data);
ParticipantMonitorListener monitorListener =
- new ParticipantMonitorListener("CLMParticipantReport");
+ new ParticipantMonitorListener(DOMAIN_NAME, "Cluster", CLUSTER_NAME);
Thread.sleep(1000);
Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
@@ -138,7 +148,8 @@ public class TestParticipantMonitor {
Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
data = new StateTransitionDataPoint(1000, 500, 300, true);
- StateTransitionContext cxt2 = new StateTransitionContext(CLUSTER_NAME,
"instance", "db_2", "a-b");
+ StateTransitionContext cxt2 =
+ new StateTransitionContext(CLUSTER_NAME, "instance", "db_2", "a-b");
monitor.reportTransitionStat(cxt2, data);
monitor.reportTransitionStat(cxt2, data);
Thread.sleep(1000);
@@ -147,12 +158,13 @@ public class TestParticipantMonitor {
Assert.assertTrue(cxt.equals(cxt2));
Assert.assertFalse(cxt.equals(new Object()));
- Assert.assertTrue(cxt.equals(new StateTransitionContext(CLUSTER_NAME,
"instance", "db_1", "a-b")));
+ Assert.assertTrue(
+ cxt.equals(new StateTransitionContext(CLUSTER_NAME, "instance",
"db_1", "a-b")));
cxt2.getInstanceName();
ParticipantMonitorListener monitorListener2 =
- new ParticipantMonitorListener("CLMParticipantReport");
+ new ParticipantMonitorListener(DOMAIN_NAME, "Cluster", CLUSTER_NAME);
Thread.sleep(1000);
// Same here. Helix only measures per cluster + per state transitions.
@@ -160,6 +172,75 @@ public class TestParticipantMonitor {
monitorListener2.disconnect();
monitorListener.disconnect();
- System.out.println("END TestParticipantMonitor");
+
+ System.out.println("END TestParticipantStateTransitionMonitor");
+ }
+
+ @Test()
+ public void testReportMessageData()
+ throws InstanceNotFoundException, MalformedObjectNameException,
NullPointerException,
+ IOException, InterruptedException, MBeanException,
AttributeNotFoundException,
+ ReflectionException {
+ System.out.println("START TestParticipantMessageMonitor");
+ ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(true,
PARTICIPANT_NAME);
+
+ Message message = new Message(Message.MessageType.NO_OP, "0");
+ monitor.reportReceivedMessage(message);
+ Thread.sleep(1000);
+ ParticipantMonitorListener monitorListener =
+ new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName",
PARTICIPANT_NAME);
+ Thread.sleep(1000);
+ Assert.assertEquals(monitorListener._beanValueMap.size(), 2);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("ReceivedMessages"), 1L);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("PendingMessages"), 1L);
+
+ monitor
+ .reportProcessedMessage(message,
ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
+ Thread.sleep(1000);
+ monitorListener =
+ new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName",
PARTICIPANT_NAME);
+ Thread.sleep(1000);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("ReceivedMessages"), 1L);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("PendingMessages"), 0L);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("CompletedMessages"), 1L);
+
+ monitor.reportReceivedMessage(message);
+ Thread.sleep(1000);
+ monitorListener =
+ new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName",
PARTICIPANT_NAME);
+ Thread.sleep(1000);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("ReceivedMessages"), 2L);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("PendingMessages"), 1L);
+
+ monitor
+ .reportProcessedMessage(message,
ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
+ Thread.sleep(1000);
+ monitorListener =
+ new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName",
PARTICIPANT_NAME);
+ Thread.sleep(1000);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("DiscardedMessages"), 1L);
+ Assert.assertEquals(monitorListener._beanValueMap.get(
+
getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+ .toString()).get("PendingMessages"), 0L);
+
+ monitorListener.disconnect();
+
+ System.out.println("END TestParticipantMessageMonitor");
}
}