This is an automated email from the ASF dual-hosted git repository.
lxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 9b29a6e Add Cluster and instance level metrics to report the number
of messages that have not been completed after their expected completion time
new dc2e8bb Merge pull request #1111 from lei-xia/master
9b29a6e is described below
commit 9b29a6e2f13344fa43f81d2bb21845cb65a83c7d
Author: Lei Xia <[email protected]>
AuthorDate: Sat Jun 20 11:04:26 2020 -0700
Add Cluster and instance level metrics to report the number of messages
that have not been completed after their expected completion time
Changes in this commit:
1) Add expected completion time field to Message, set the default expected
completion period to be 1 day (can be configurable via system property).
2) Add Past-Due incomplete message gauge in both cluster and instance
monitor.
---
.../java/org/apache/helix/SystemPropertyKeys.java | 4 +
.../helix/controller/GenericHelixController.java | 4 -
.../controller/stages/ReadClusterDataStage.java | 8 +-
.../main/java/org/apache/helix/model/Message.java | 42 ++++++--
.../monitoring/mbeans/ClusterStatusMonitor.java | 88 ++++++++++------
.../mbeans/ClusterStatusMonitorMBean.java | 7 ++
.../helix/monitoring/mbeans/InstanceMonitor.java | 36 ++++++-
.../mbeans/TestClusterStatusMonitor.java | 111 ++++++++++++++++++++-
.../monitoring/mbeans/TestInstanceMonitor.java | 4 +
9 files changed, 262 insertions(+), 42 deletions(-)
diff --git
a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index c71d5c6..e9d8fcb 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -69,6 +69,10 @@ public class SystemPropertyKeys {
// Controller
public static final String CONTROLLER_MESSAGE_PURGE_DELAY =
"helix.controller.stages.MessageGenerationPhase.messagePurgeDelay";
+ // Message
+ public static final String MESSAGE_EXPECTED_COMPLETION_PERIOD =
"helix.controller.message.ExpectMessageCompletionPeriod";
+
+
// MBean monitor for helix.
public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS =
"helix.monitor.slidingTimeWindow.ms";
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 683845c..238242f 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -942,10 +942,6 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
pushToEventQueues(ClusterEventType.MessageChange, changeContext,
Collections.<String,
Object>singletonMap(AttributeName.instanceName.name(), instanceName));
- if (_isMonitoring && messages != null) {
- _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
- }
-
logger.info("END: GenericClusterController.onMessage() for cluster " +
_clusterName);
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index d284c8e..535fadf 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,6 +19,8 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,6 +39,7 @@ import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +80,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
Map<String, List<String>> oldDisabledPartitions =
Maps.newHashMap();
Map<String, Set<String>> tags = Maps.newHashMap();
Map<String, LiveInstance> liveInstanceMap =
dataProvider.getLiveInstances();
+ Map<String, Set<Message>> instanceMessageMap = Maps.newHashMap();
for (Map.Entry<String, InstanceConfig> e :
dataProvider.getInstanceConfigMap()
.entrySet()) {
String instanceName = e.getKey();
@@ -84,6 +88,8 @@ public class ReadClusterDataStage extends AbstractBaseStage {
instanceSet.add(instanceName);
if (liveInstanceMap.containsKey(instanceName)) {
liveInstanceSet.add(instanceName);
+ instanceMessageMap.put(instanceName,
+
Sets.newHashSet(dataProvider.getMessages(instanceName).values()));
}
if (!config.getInstanceEnabled() ||
(clusterConfig.getDisabledInstances() != null
&&
clusterConfig.getDisabledInstances().containsKey(instanceName))) {
@@ -99,7 +105,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
}
clusterStatusMonitor
.setClusterInstanceStatus(liveInstanceSet, instanceSet,
disabledInstanceSet,
- disabledPartitions, oldDisabledPartitions, tags);
+ disabledPartitions, oldDisabledPartitions, tags,
instanceMessageMap);
LogUtil.logDebug(logger, _eventId, "Complete cluster status
monitors update.");
}
return null;
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index eefecec..7d9e047 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import org.apache.helix.HelixException;
@@ -36,6 +37,8 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
/**
@@ -79,6 +82,7 @@ public class Message extends HelixProperty {
TO_STATE,
STATE_MODEL_DEF,
CREATE_TIMESTAMP,
+ COMPLETION_DUE_TIMESTAMP,
READ_TIMESTAMP,
EXECUTE_START_TIMESTAMP,
MSG_TYPE,
@@ -117,6 +121,11 @@ public class Message extends HelixProperty {
// Currently, the field is only used for invalidating messages in
controller's message cache.
private boolean _expired = false;
+ // The expect period of time (in ms) that a message should be completed,
default 1 day
+ public static final long MESSAGE_EXPECT_COMPLETION_PERIOD = HelixUtil
+
.getSystemPropertyAsLong(SystemPropertyKeys.MESSAGE_EXPECTED_COMPLETION_PERIOD,
+ TimeUnit.DAYS.toMillis(1));
+
/**
* Compares the creation time of two Messages
*/
@@ -167,13 +176,21 @@ public class Message extends HelixProperty {
/**
* Set the time that the message was created
- * @param timestamp a UNIX timestamp
+ * @param timestamp a UNIX timestamp (in ms)
*/
public void setCreateTimeStamp(long timestamp) {
_record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), timestamp);
}
/**
+ * Set the time that the message was expected to be completed
+ * @param timestamp a UNIX timestamp (in ms)
+ */
+ public void setCompletionDueTimeStamp(long timestamp) {
+ _record.setLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(),
timestamp);
+ }
+
+ /**
* Instantiate a message with a new id
* @param record a ZNRecord corresponding to a message
* @param id unique message identifier
@@ -490,7 +507,7 @@ public class Message extends HelixProperty {
/**
* Set the time that this message was read
- * @param time UNIX timestamp
+ * @param time UNIX timestamp (in ms)
*/
public void setReadTimeStamp(long time) {
_record.setLongField(Attributes.READ_TIMESTAMP.toString(), time);
@@ -498,7 +515,7 @@ public class Message extends HelixProperty {
/**
* Set the time that the instance executes tasks as instructed by this
message
- * @param time UNIX timestamp
+ * @param time UNIX timestamp (in ms)
*/
public void setExecuteStartTimeStamp(long time) {
_record.setLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), time);
@@ -506,7 +523,7 @@ public class Message extends HelixProperty {
/**
* Get the time that this message was read
- * @return UNIX timestamp
+ * @return UNIX timestamp (in ms)
*/
public long getReadTimeStamp() {
return _record.getLongField(Attributes.READ_TIMESTAMP.toString(), 0L);
@@ -514,7 +531,7 @@ public class Message extends HelixProperty {
/**
* Get the time that execution occurred as a result of this message
- * @return UNIX timestamp
+ * @return UNIX timestamp (in ms)
*/
public long getExecuteStartTimeStamp() {
return _record.getLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(),
0L);
@@ -522,13 +539,26 @@ public class Message extends HelixProperty {
/**
* Get the time that this message was created
- * @return UNIX timestamp
+ * @return UNIX timestamp (in ms)
*/
public long getCreateTimeStamp() {
return _record.getLongField(Attributes.CREATE_TIMESTAMP.toString(), 0L);
}
/**
+ * Get the time that the message was expected to be completed
+ * @return UNIX timestamp (in ms)
+ */
+ public long getCompletionDueTimeStamp() {
+ long completionDue =
_record.getLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), 0L);
+ if (completionDue == 0) {
+ completionDue = getCreateTimeStamp() + MESSAGE_EXPECT_COMPLETION_PERIOD;
+ }
+
+ return completionDue;
+ }
+
+ /**
* Set a unique identifier that others can use to refer to this message in
replies
* @param correlationId a unique identifier, usually randomly generated
*/
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index fc0b19d..39ef1e1 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -37,7 +37,6 @@ import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
@@ -82,7 +81,9 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
private Set<String> _disabledInstances = Collections.emptySet();
private Map<String, Map<String, List<String>>> _disabledPartitions =
Collections.emptyMap();
private Map<String, List<String>> _oldDisabledPartitions =
Collections.emptyMap();
- private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
+ private AtomicLong _totalMsgQueueSize = new AtomicLong(0L);
+ private AtomicLong _maxInstanceMsgQueueSize = new AtomicLong(0L);
+ private AtomicLong _totalPastDueMsgSize = new AtomicLong(0L);
private boolean _rebalanceFailure = false;
private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
@@ -175,22 +176,17 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
@Override
public long getMaxMessageQueueSizeGauge() {
- long maxQueueSize = 0;
- for (Long queueSize : _instanceMsgQueueSizes.values()) {
- if (queueSize > maxQueueSize) {
- maxQueueSize = queueSize;
- }
- }
- return maxQueueSize;
+ return _maxInstanceMsgQueueSize.get();
}
@Override
public long getInstanceMessageQueueBacklog() {
- long sum = 0;
- for (Long queueSize : _instanceMsgQueueSizes.values()) {
- sum += queueSize;
- }
- return sum;
+ return _totalMsgQueueSize.get();
+ }
+
+ @Override
+ public long getTotalPastDueMessageGauge() {
+ return _totalPastDueMsgSize.get();
}
private void register(Object bean, ObjectName name) {
@@ -228,10 +224,12 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
* @param disabledInstanceSet the current set of configured instances that
are disabled
* @param disabledPartitions a map of instance name to the set of partitions
disabled on it
* @param tags a map of instance name to the set of tags on it
+ * @param instanceMessageMap a map of pending messages from each live
instance
*/
public void setClusterInstanceStatus(Set<String> liveInstanceSet,
Set<String> instanceSet,
Set<String> disabledInstanceSet, Map<String, Map<String, List<String>>>
disabledPartitions,
- Map<String, List<String>> oldDisabledPartitions, Map<String,
Set<String>> tags) {
+ Map<String, List<String>> oldDisabledPartitions, Map<String,
Set<String>> tags,
+ Map<String, Set<Message>> instanceMessageMap) {
synchronized (_instanceMonitorMap) {
// Unregister beans for instances that are no longer configured
Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
@@ -254,6 +252,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
LOG.error("Failed to create instance monitor for instance: {}.",
instanceName);
}
}
+
try {
registerInstances(monitorsToRegister);
} catch (JMException e) {
@@ -267,6 +266,12 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
_disabledPartitions = disabledPartitions;
_oldDisabledPartitions = oldDisabledPartitions;
+ // message related counts
+ long totalMsgQueueSize = 0L;
+ long maxInstanceMsgQueueSize = 0L;
+ long totalPastDueMsgSize = 0L;
+ long now = System.currentTimeMillis();
+
// Update the instance MBeans
for (String instanceName : instanceSet) {
if (_instanceMonitorMap.containsKey(instanceName)) {
@@ -277,6 +282,24 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
oldDisabledPartitions.get(instanceName),
liveInstanceSet.contains(instanceName),
!disabledInstanceSet.contains(instanceName));
+ // calculate and update instance level message related gauges
+ Set<Message> messages = instanceMessageMap.get(instanceName);
+ if (messages != null) {
+ long msgQueueSize = messages.size();
+ bean.updateMessageQueueSize(msgQueueSize);
+ totalMsgQueueSize += msgQueueSize;
+ if (msgQueueSize > maxInstanceMsgQueueSize) {
+ maxInstanceMsgQueueSize = msgQueueSize;
+ }
+
+ long pastDueMsgCount =
+ messages.stream().filter(m -> (m.getCompletionDueTimeStamp()
<= now)).count();
+ bean.updatePastDueMessageGauge(pastDueMsgCount);
+ totalPastDueMsgSize += pastDueMsgCount;
+ LOG.debug("There are totally {} messages, {} are past due on
instance {}", msgQueueSize,
+ pastDueMsgCount, instanceName);
+ }
+
// If the sensor name changed, re-register the bean so that
listeners won't miss it
String newSensorName = bean.getSensorName();
if (!oldSensorName.equals(newSensorName)) {
@@ -289,6 +312,11 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
}
}
}
+
+ // Update cluster level message related gauges
+ _maxInstanceMsgQueueSize.set(maxInstanceMsgQueueSize);
+ _totalMsgQueueSize.set(totalMsgQueueSize);
+ _totalPastDueMsgSize.set(totalPastDueMsgSize);
}
}
@@ -324,7 +352,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
}
/**
- * Update message count per instance and per resource
+ * Update the total count of messages that the controller has sent to each
instance and each resource so far
* @param messages a list of messages
*/
public void increaseMessageReceived(List<Message> messages) {
@@ -351,7 +379,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
}
}
- // Update message count per instance and per resource
+ // Update message count sent per instance and per resource
for (String instance : messageCountPerInstance.keySet()) {
InstanceMonitor instanceMonitor = _instanceMonitorMap.get(instance);
if (instanceMonitor != null) {
@@ -563,10 +591,6 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
}
}
- public void addMessageQueueSize(String instanceName, long msgQueueSize) {
- _instanceMsgQueueSizes.put(instanceName, msgQueueSize);
- }
-
public void active() {
LOG.info("Active ClusterStatusMonitor");
try {
@@ -580,7 +604,6 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
LOG.info("Reset ClusterStatusMonitor");
try {
unregisterAllResources();
- _instanceMsgQueueSizes.clear();
unregisterAllInstances();
unregisterAllPerInstanceResources();
unregister(getObjectName(clusterBeanName()));
@@ -588,7 +611,16 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
unregisterAllWorkflowsMonitor();
unregisterAllJobs();
+ _liveInstances.clear();
+ _instances.clear();
+ _disabledInstances.clear();
+ _disabledPartitions.clear();
+ _oldDisabledPartitions.clear();
_rebalanceFailure = false;
+ _maxInstanceMsgQueueSize.set(0L);
+ _totalPastDueMsgSize.set(0L);
+ _totalMsgQueueSize.set(0L);
+ _rebalanceFailureCount.set(0L);
} catch (Exception e) {
LOG.error("Fail to reset ClusterStatusMonitor, cluster: " +
_clusterName, e);
}
@@ -872,7 +904,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
return _resourceMonitorMap.get(resourceName);
}
- public String clusterBeanName() {
+ protected String clusterBeanName() {
return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
}
@@ -881,7 +913,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
* @param instanceName
* @return instance bean name
*/
- private String getInstanceBeanName(String instanceName) {
+ protected String getInstanceBeanName(String instanceName) {
return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY,
instanceName);
}
@@ -890,7 +922,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
* @param resourceName
* @return resource bean name
*/
- private String getResourceBeanName(String resourceName) {
+ protected String getResourceBeanName(String resourceName) {
return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY,
resourceName);
}
@@ -901,7 +933,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
* @param resourceName
* @return per-instance resource bean name
*/
- public String getPerInstanceResourceBeanName(String instanceName, String
resourceName) {
+ protected String getPerInstanceResourceBeanName(String instanceName, String
resourceName) {
return String.format("%s,%s", clusterBeanName(),
new PerInstanceResourceMonitor.BeanName(instanceName,
resourceName).toString());
}
@@ -912,7 +944,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
* @param workflowType The workflow type
* @return per workflow type bean name
*/
- public String getWorkflowBeanName(String workflowType) {
+ protected String getWorkflowBeanName(String workflowType) {
return String.format("%s, %s=%s", clusterBeanName(), WORKFLOW_TYPE_DN_KEY,
workflowType);
}
@@ -922,7 +954,7 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
* @param jobType The job type
* @return per job type bean name
*/
- public String getJobBeanName(String jobType) {
+ protected String getJobBeanName(String jobType) {
return String.format("%s, %s=%s", clusterBeanName(), JOB_TYPE_DN_KEY,
jobType);
}
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 65f5a4e..ed94763 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -61,6 +61,13 @@ public interface ClusterStatusMonitorMBean extends
SensorNameProvider {
long getInstanceMessageQueueBacklog();
/**
+ * Total count of all messages that have not been completed
+ * after their expected completion time for instances in this cluster
+ * @return
+ */
+ long getTotalPastDueMessageGauge();
+
+ /**
* @return 1 if cluster is enabled, otherwise 0
*/
long getEnabled();
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index e0c0f89..86151e7 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -48,7 +48,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
ENABLED_STATUS_GAUGE("Enabled"),
ONLINE_STATUS_GAUGE("Online"),
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
- MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
+ MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge"),
+ MESSAGE_QUEUE_SIZE_GAUGE("MessageQueueSizeGauge"),
+ PASTDUE_MESSAGE_GAUGE("PastDueMessageGauge");
private final String metricName;
@@ -75,6 +77,8 @@ public class InstanceMonitor extends DynamicMBeanProvider {
private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
private SimpleDynamicMetric<Long> _onlineStatusGauge;
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
+ private SimpleDynamicMetric<Long> _messageQueueSizeGauge;
+ private SimpleDynamicMetric<Long> _pastDueMessageGauge;
// A map of dynamic capacity Gauges. The map's keys could change.
private final Map<String, SimpleDynamicMetric<Long>>
_dynamicCapacityMetricsMap;
@@ -108,6 +112,12 @@ public class InstanceMonitor extends DynamicMBeanProvider {
_maxCapacityUsageGauge =
new
SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
0.0d);
+ _messageQueueSizeGauge =
+ new
SimpleDynamicMetric<>(InstanceMonitorMetric.MESSAGE_QUEUE_SIZE_GAUGE.metricName(),
+ 0L);
+ _pastDueMessageGauge =
+ new
SimpleDynamicMetric<>(InstanceMonitorMetric.PASTDUE_MESSAGE_GAUGE.metricName(),
+ 0L);
}
private List<DynamicMetric<?, ?>> buildAttributeList() {
@@ -116,7 +126,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
_disabledPartitionsGauge,
_enabledStatusGauge,
_onlineStatusGauge,
- _maxCapacityUsageGauge
+ _maxCapacityUsageGauge,
+ _messageQueueSizeGauge,
+ _pastDueMessageGauge
);
attributeList.addAll(_dynamicCapacityMetricsMap.values());
@@ -146,6 +158,10 @@ public class InstanceMonitor extends DynamicMBeanProvider {
return _disabledPartitionsGauge.getValue();
}
+ protected long getMessageQueueSizeGauge() { return
_messageQueueSizeGauge.getValue(); }
+
+ protected long getPastDueMessageGauge() { return
_pastDueMessageGauge.getValue(); }
+
/**
* Get the name of the monitored instance
* @return instance name as a string
@@ -210,6 +226,22 @@ public class InstanceMonitor extends DynamicMBeanProvider {
}
/**
+ * Updates message queue size for this instance.
+ * @param queueSize message queue size of this instance
+ */
+ public synchronized void updateMessageQueueSize(long queueSize) {
+ _messageQueueSizeGauge.updateValue(queueSize);
+ }
+
+ /**
+ * Updates number of messages that has not been completed after its expected
completion time for this instance.
+ * @param msgCount count of messages that has not been completed after its
due completion time
+ */
+ public synchronized void updatePastDueMessageGauge(long msgCount) {
+ _pastDueMessageGauge.updateValue(msgCount);
+ }
+
+ /**
* Gets max capacity usage of this instance.
* @return Max capacity usage of this instance.
*/
diff --git
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index dee1f86..789992f 100644
---
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -29,6 +29,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
@@ -56,6 +58,8 @@ import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.Assert;
import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
public class TestClusterStatusMonitor {
private static final MBeanServerConnection _server =
ManagementFactory.getPlatformMBeanServer();
@@ -164,6 +168,84 @@ public class TestClusterStatusMonitor {
System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
}
+ @Test()
+ public void testMessageMetrics() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 5;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ monitor.active();
+ ObjectName clusterMonitorObjName =
monitor.getObjectName(monitor.clusterBeanName());
+
+ Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+ Map<String, Set<Message>> instanceMessageMap = Maps.newHashMap();
+ Set<String> liveInstanceSet = Sets.newHashSet();
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ liveInstanceSet.add(instanceName);
+
+ long now = System.currentTimeMillis();
+ Set<Message> messages = Sets.newHashSet();
+ // add 10 regular messages to each instance
+ for (int j = 0; j < 10; j++) {
+ Message m = new Message(Message.MessageType.STATE_TRANSITION,
UUID.randomUUID().toString());
+ m.setTgtName(instanceName);
+ messages.add(m);
+ }
+
+ // add 10 past-due messages to each instance (using default completion
period)
+ for (int j = 0; j < 10; j++) {
+ Message m = new Message(Message.MessageType.STATE_TRANSITION,
UUID.randomUUID().toString());
+ m.setTgtName(instanceName);
+ m.setCreateTimeStamp(now - Message.MESSAGE_EXPECT_COMPLETION_PERIOD -
1000);
+ messages.add(m);
+ }
+
+ // add other 5 past-due messages to each instance (using explicitly set
COMPLETION time in message)
+ for (int j = 0; j < 5; j++) {
+ Message m = new Message(Message.MessageType.STATE_TRANSITION,
UUID.randomUUID().toString());
+ m.setTgtName(instanceName);
+ m.setCompletionDueTimeStamp(now - 1000);
+ messages.add(m);
+ }
+ instanceMessageMap.put(instanceName, messages);
+ }
+
+ monitor.setClusterInstanceStatus(liveInstanceSet, liveInstanceSet,
Collections.emptySet(),
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), instanceMessageMap);
+
+ Assert.assertEquals(monitor.getInstanceMessageQueueBacklog(), 25 * n);
+ Assert.assertEquals(monitor.getTotalPastDueMessageGauge(), 15 * n);
+
+ Object totalMsgSize =
+ _server.getAttribute(clusterMonitorObjName,
"InstanceMessageQueueBacklog");
+ Assert.assertTrue(totalMsgSize instanceof Long);
+ Assert.assertEquals((long) totalMsgSize, 25 * n);
+
+ Object totalPastdueMsgCount =
+ _server.getAttribute(clusterMonitorObjName,
"TotalPastDueMessageGauge");
+ Assert.assertTrue(totalPastdueMsgCount instanceof Long);
+ Assert.assertEquals((long) totalPastdueMsgCount, 15 * n);
+
+ for (String instance : liveInstanceSet) {
+ ObjectName objName =
+ monitor.getObjectName(monitor.getInstanceBeanName(instance));
+ Object messageSize = _server.getAttribute(objName,
"MessageQueueSizeGauge");
+ Assert.assertTrue(messageSize instanceof Long);
+ Assert.assertEquals((long) messageSize, 25L);
+
+ Object pastdueMsgCount = _server.getAttribute(objName,
"PastDueMessageGauge");
+ Assert.assertTrue(pastdueMsgCount instanceof Long);
+ Assert.assertEquals((long) pastdueMsgCount, 15L);
+ }
+
+ System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+ }
@Test
public void testResourceAggregation() throws JMException, IOException {
@@ -352,7 +434,7 @@ public class TestClusterStatusMonitor {
// Call setClusterInstanceStatus to register instance monitors.
monitor.setClusterInstanceStatus(maxUsageMap.keySet(),
maxUsageMap.keySet(),
Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
- Collections.emptyMap());
+ Collections.emptyMap(), Collections.emptyMap());
// Update instance capacity status.
for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
@@ -435,4 +517,31 @@ public class TestClusterStatusMonitor {
}
}
}
+
+ private void verifyMessageMetrics(ClusterStatusMonitor monitor, Map<String,
Double> maxUsageMap,
+ Map<String, Map<String, Integer>> instanceCapacityMap)
+ throws MalformedObjectNameException, IOException,
AttributeNotFoundException, MBeanException,
+ ReflectionException, InstanceNotFoundException {
+ // Verify results.
+ for (Map.Entry<String, Map<String, Integer>> instanceEntry :
instanceCapacityMap.entrySet()) {
+ String instance = instanceEntry.getKey();
+ Map<String, Integer> capacityMap = instanceEntry.getValue();
+ String instanceBeanName = String
+ .format("%s,%s=%s", monitor.clusterBeanName(),
ClusterStatusMonitor.INSTANCE_DN_KEY,
+ instance);
+ ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+ Assert.assertTrue(_server.isRegistered(instanceObjectName));
+ Assert.assertEquals(_server.getAttribute(instanceObjectName,
+
InstanceMonitor.InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName()),
+ maxUsageMap.get(instance));
+
+ for (Map.Entry<String, Integer> capacityEntry : capacityMap.entrySet()) {
+ String capacityKey = capacityEntry.getKey();
+ String attributeName = capacityKey + "Gauge";
+ Assert.assertEquals((long) _server.getAttribute(instanceObjectName,
attributeName),
+ (long) instanceCapacityMap.get(instance).get(capacityKey));
+ }
+ }
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
index 609581b..7cac92e 100644
---
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
+++
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
@@ -59,6 +59,8 @@ public class TestInstanceMonitor {
monitor.updateMaxCapacityUsage(0.5d);
monitor.increaseMessageCount(10L);
monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(),
true, true);
+ monitor.updateMessageQueueSize(100L);
+ monitor.updatePastDueMessageGauge(50L);
// Verify metrics.
Assert.assertEquals(monitor.getTotalMessageReceived(), 10L);
@@ -69,6 +71,8 @@ public class TestInstanceMonitor {
Assert.assertEquals(monitor.getEnabled(), 1L);
Assert.assertEquals(monitor.getDisabledPartitions(), 2L);
Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.5d);
+ Assert.assertEquals(monitor.getMessageQueueSizeGauge(), 100L);
+ Assert.assertEquals(monitor.getPastDueMessageGauge(), 50L);
monitor.unregister();
}