This is an automated email from the ASF dual-hosted git repository.

lxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b29a6e  Add Cluster and instance level metrics to report the number 
of messages that have not been completed after their expected completion time
     new dc2e8bb  Merge pull request #1111 from lei-xia/master
9b29a6e is described below

commit 9b29a6e2f13344fa43f81d2bb21845cb65a83c7d
Author: Lei Xia <[email protected]>
AuthorDate: Sat Jun 20 11:04:26 2020 -0700

    Add Cluster and instance level metrics to report the number of messages 
that have not been completed after their expected completion time
    
    Changes in this commit:
    1) Add expected completion time field to Message, set the default expected 
completion period to be 1 day (can be configurable via system property).
    2) Add Past-Due incomplete message gauge in both cluster and instance 
monitor.
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |   4 +
 .../helix/controller/GenericHelixController.java   |   4 -
 .../controller/stages/ReadClusterDataStage.java    |   8 +-
 .../main/java/org/apache/helix/model/Message.java  |  42 ++++++--
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  88 ++++++++++------
 .../mbeans/ClusterStatusMonitorMBean.java          |   7 ++
 .../helix/monitoring/mbeans/InstanceMonitor.java   |  36 ++++++-
 .../mbeans/TestClusterStatusMonitor.java           | 111 ++++++++++++++++++++-
 .../monitoring/mbeans/TestInstanceMonitor.java     |   4 +
 9 files changed, 262 insertions(+), 42 deletions(-)

diff --git 
a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java 
b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index c71d5c6..e9d8fcb 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -69,6 +69,10 @@ public class SystemPropertyKeys {
   // Controller
   public static final String CONTROLLER_MESSAGE_PURGE_DELAY = 
"helix.controller.stages.MessageGenerationPhase.messagePurgeDelay";
 
+  // Message
+  public static final String MESSAGE_EXPECTED_COMPLETION_PERIOD = 
"helix.controller.message.ExpectMessageCompletionPeriod";
+
+
   // MBean monitor for helix.
   public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = 
"helix.monitor.slidingTimeWindow.ms";
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 683845c..238242f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -942,10 +942,6 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
     pushToEventQueues(ClusterEventType.MessageChange, changeContext,
         Collections.<String, 
Object>singletonMap(AttributeName.instanceName.name(), instanceName));
 
-    if (_isMonitoring && messages != null) {
-      _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
-    }
-
     logger.info("END: GenericClusterController.onMessage() for cluster " + 
_clusterName);
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index d284c8e..535fadf 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,6 +19,8 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,6 +39,7 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +80,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
             Map<String, List<String>> oldDisabledPartitions = 
Maps.newHashMap();
             Map<String, Set<String>> tags = Maps.newHashMap();
             Map<String, LiveInstance> liveInstanceMap = 
dataProvider.getLiveInstances();
+            Map<String, Set<Message>> instanceMessageMap = Maps.newHashMap();
             for (Map.Entry<String, InstanceConfig> e : 
dataProvider.getInstanceConfigMap()
                 .entrySet()) {
               String instanceName = e.getKey();
@@ -84,6 +88,8 @@ public class ReadClusterDataStage extends AbstractBaseStage {
               instanceSet.add(instanceName);
               if (liveInstanceMap.containsKey(instanceName)) {
                 liveInstanceSet.add(instanceName);
+                instanceMessageMap.put(instanceName,
+                    
Sets.newHashSet(dataProvider.getMessages(instanceName).values()));
               }
               if (!config.getInstanceEnabled() || 
(clusterConfig.getDisabledInstances() != null
                   && 
clusterConfig.getDisabledInstances().containsKey(instanceName))) {
@@ -99,7 +105,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
             }
             clusterStatusMonitor
                 .setClusterInstanceStatus(liveInstanceSet, instanceSet, 
disabledInstanceSet,
-                    disabledPartitions, oldDisabledPartitions, tags);
+                    disabledPartitions, oldDisabledPartitions, tags, 
instanceMessageMap);
             LogUtil.logDebug(logger, _eventId, "Complete cluster status 
monitors update.");
           }
           return null;
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index eefecec..7d9e047 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 import org.apache.helix.HelixException;
@@ -36,6 +37,8 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
@@ -79,6 +82,7 @@ public class Message extends HelixProperty {
     TO_STATE,
     STATE_MODEL_DEF,
     CREATE_TIMESTAMP,
+    COMPLETION_DUE_TIMESTAMP,
     READ_TIMESTAMP,
     EXECUTE_START_TIMESTAMP,
     MSG_TYPE,
@@ -117,6 +121,11 @@ public class Message extends HelixProperty {
   // Currently, the field is only used for invalidating messages in 
controller's message cache.
   private boolean _expired = false;
 
+  // The expect period of time (in ms) that a message should be completed, 
default 1 day
+  public static final long MESSAGE_EXPECT_COMPLETION_PERIOD = HelixUtil
+      
.getSystemPropertyAsLong(SystemPropertyKeys.MESSAGE_EXPECTED_COMPLETION_PERIOD,
+          TimeUnit.DAYS.toMillis(1));
+
   /**
    * Compares the creation time of two Messages
    */
@@ -167,13 +176,21 @@ public class Message extends HelixProperty {
 
   /**
    * Set the time that the message was created
-   * @param timestamp a UNIX timestamp
+   * @param timestamp a UNIX timestamp (in ms)
    */
   public void setCreateTimeStamp(long timestamp) {
     _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), timestamp);
   }
 
   /**
+   * Set the time that the message was expected to be completed
+   * @param timestamp a UNIX timestamp (in ms)
+   */
+  public void setCompletionDueTimeStamp(long timestamp) {
+    _record.setLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), 
timestamp);
+  }
+
+  /**
    * Instantiate a message with a new id
    * @param record a ZNRecord corresponding to a message
    * @param id unique message identifier
@@ -490,7 +507,7 @@ public class Message extends HelixProperty {
 
   /**
    * Set the time that this message was read
-   * @param time UNIX timestamp
+   * @param time UNIX timestamp (in ms)
    */
   public void setReadTimeStamp(long time) {
     _record.setLongField(Attributes.READ_TIMESTAMP.toString(), time);
@@ -498,7 +515,7 @@ public class Message extends HelixProperty {
 
   /**
    * Set the time that the instance executes tasks as instructed by this 
message
-   * @param time UNIX timestamp
+   * @param time UNIX timestamp (in ms)
    */
   public void setExecuteStartTimeStamp(long time) {
     _record.setLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), time);
@@ -506,7 +523,7 @@ public class Message extends HelixProperty {
 
   /**
    * Get the time that this message was read
-   * @return UNIX timestamp
+   * @return UNIX timestamp (in ms)
    */
   public long getReadTimeStamp() {
     return _record.getLongField(Attributes.READ_TIMESTAMP.toString(), 0L);
@@ -514,7 +531,7 @@ public class Message extends HelixProperty {
 
   /**
    * Get the time that execution occurred as a result of this message
-   * @return UNIX timestamp
+   * @return UNIX timestamp (in ms)
    */
   public long getExecuteStartTimeStamp() {
     return _record.getLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), 
0L);
@@ -522,13 +539,26 @@ public class Message extends HelixProperty {
 
   /**
    * Get the time that this message was created
-   * @return UNIX timestamp
+   * @return UNIX timestamp (in ms)
    */
   public long getCreateTimeStamp() {
     return _record.getLongField(Attributes.CREATE_TIMESTAMP.toString(), 0L);
   }
 
   /**
+   * Get the time that the message was expected to be completed
+   * @return UNIX timestamp (in ms)
+   */
+  public long getCompletionDueTimeStamp() {
+    long completionDue = 
_record.getLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), 0L);
+    if (completionDue == 0) {
+      completionDue = getCreateTimeStamp() + MESSAGE_EXPECT_COMPLETION_PERIOD;
+    }
+
+    return completionDue;
+  }
+
+  /**
    * Set a unique identifier that others can use to refer to this message in 
replies
    * @param correlationId a unique identifier, usually randomly generated
    */
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index fc0b19d..39ef1e1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -37,7 +37,6 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
@@ -82,7 +81,9 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   private Set<String> _disabledInstances = Collections.emptySet();
   private Map<String, Map<String, List<String>>> _disabledPartitions = 
Collections.emptyMap();
   private Map<String, List<String>> _oldDisabledPartitions = 
Collections.emptyMap();
-  private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
+  private AtomicLong _totalMsgQueueSize = new AtomicLong(0L);
+  private AtomicLong _maxInstanceMsgQueueSize = new AtomicLong(0L);
+  private AtomicLong _totalPastDueMsgSize = new AtomicLong(0L);
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
 
@@ -175,22 +176,17 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
 
   @Override
   public long getMaxMessageQueueSizeGauge() {
-    long maxQueueSize = 0;
-    for (Long queueSize : _instanceMsgQueueSizes.values()) {
-      if (queueSize > maxQueueSize) {
-        maxQueueSize = queueSize;
-      }
-    }
-    return maxQueueSize;
+    return _maxInstanceMsgQueueSize.get();
   }
 
   @Override
   public long getInstanceMessageQueueBacklog() {
-    long sum = 0;
-    for (Long queueSize : _instanceMsgQueueSizes.values()) {
-      sum += queueSize;
-    }
-    return sum;
+    return _totalMsgQueueSize.get();
+  }
+
+  @Override
+  public long getTotalPastDueMessageGauge() {
+    return _totalPastDueMsgSize.get();
   }
 
   private void register(Object bean, ObjectName name) {
@@ -228,10 +224,12 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param disabledInstanceSet the current set of configured instances that 
are disabled
    * @param disabledPartitions a map of instance name to the set of partitions 
disabled on it
    * @param tags a map of instance name to the set of tags on it
+   * @param instanceMessageMap a map of pending messages from each live 
instance
    */
   public void setClusterInstanceStatus(Set<String> liveInstanceSet, 
Set<String> instanceSet,
       Set<String> disabledInstanceSet, Map<String, Map<String, List<String>>> 
disabledPartitions,
-      Map<String, List<String>> oldDisabledPartitions, Map<String, 
Set<String>> tags) {
+      Map<String, List<String>> oldDisabledPartitions, Map<String, 
Set<String>> tags,
+      Map<String, Set<Message>> instanceMessageMap) {
     synchronized (_instanceMonitorMap) {
       // Unregister beans for instances that are no longer configured
       Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
@@ -254,6 +252,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
           LOG.error("Failed to create instance monitor for instance: {}.", 
instanceName);
         }
       }
+
       try {
         registerInstances(monitorsToRegister);
       } catch (JMException e) {
@@ -267,6 +266,12 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
       _disabledPartitions = disabledPartitions;
       _oldDisabledPartitions = oldDisabledPartitions;
 
+      // message related counts
+      long totalMsgQueueSize = 0L;
+      long maxInstanceMsgQueueSize = 0L;
+      long totalPastDueMsgSize = 0L;
+      long now = System.currentTimeMillis();
+
       // Update the instance MBeans
       for (String instanceName : instanceSet) {
         if (_instanceMonitorMap.containsKey(instanceName)) {
@@ -277,6 +282,24 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
               oldDisabledPartitions.get(instanceName), 
liveInstanceSet.contains(instanceName),
               !disabledInstanceSet.contains(instanceName));
 
+          // calculate and update instance level message related gauges
+          Set<Message> messages = instanceMessageMap.get(instanceName);
+          if (messages != null) {
+            long msgQueueSize = messages.size();
+            bean.updateMessageQueueSize(msgQueueSize);
+            totalMsgQueueSize += msgQueueSize;
+            if (msgQueueSize > maxInstanceMsgQueueSize) {
+              maxInstanceMsgQueueSize = msgQueueSize;
+            }
+
+            long pastDueMsgCount =
+                messages.stream().filter(m -> (m.getCompletionDueTimeStamp() 
<= now)).count();
+            bean.updatePastDueMessageGauge(pastDueMsgCount);
+            totalPastDueMsgSize += pastDueMsgCount;
+            LOG.debug("There are totally {} messages, {} are past due on 
instance {}", msgQueueSize,
+                pastDueMsgCount, instanceName);
+          }
+
           // If the sensor name changed, re-register the bean so that 
listeners won't miss it
           String newSensorName = bean.getSensorName();
           if (!oldSensorName.equals(newSensorName)) {
@@ -289,6 +312,11 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
           }
         }
       }
+
+      // Update cluster level message related gauges
+      _maxInstanceMsgQueueSize.set(maxInstanceMsgQueueSize);
+      _totalMsgQueueSize.set(totalMsgQueueSize);
+      _totalPastDueMsgSize.set(totalPastDueMsgSize);
     }
   }
 
@@ -324,7 +352,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   }
 
   /**
-   * Update message count per instance and per resource
+   * Update the total count of messages that the controller has sent to each 
instance and each resource so far
    * @param messages a list of messages
    */
   public void increaseMessageReceived(List<Message> messages) {
@@ -351,7 +379,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
       }
     }
 
-    // Update message count per instance and per resource
+    // Update message count sent per instance and per resource
     for (String instance : messageCountPerInstance.keySet()) {
       InstanceMonitor instanceMonitor = _instanceMonitorMap.get(instance);
       if (instanceMonitor != null) {
@@ -563,10 +591,6 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     }
   }
 
-  public void addMessageQueueSize(String instanceName, long msgQueueSize) {
-    _instanceMsgQueueSizes.put(instanceName, msgQueueSize);
-  }
-
   public void active() {
     LOG.info("Active ClusterStatusMonitor");
     try {
@@ -580,7 +604,6 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     LOG.info("Reset ClusterStatusMonitor");
     try {
       unregisterAllResources();
-      _instanceMsgQueueSizes.clear();
       unregisterAllInstances();
       unregisterAllPerInstanceResources();
       unregister(getObjectName(clusterBeanName()));
@@ -588,7 +611,16 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
       unregisterAllWorkflowsMonitor();
       unregisterAllJobs();
 
+      _liveInstances.clear();
+      _instances.clear();
+      _disabledInstances.clear();
+      _disabledPartitions.clear();
+      _oldDisabledPartitions.clear();
       _rebalanceFailure = false;
+      _maxInstanceMsgQueueSize.set(0L);
+      _totalPastDueMsgSize.set(0L);
+      _totalMsgQueueSize.set(0L);
+      _rebalanceFailureCount.set(0L);
     } catch (Exception e) {
       LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + 
_clusterName, e);
     }
@@ -872,7 +904,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     return _resourceMonitorMap.get(resourceName);
   }
 
-  public String clusterBeanName() {
+  protected String clusterBeanName() {
     return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
   }
 
@@ -881,7 +913,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param instanceName
    * @return instance bean name
    */
-  private String getInstanceBeanName(String instanceName) {
+  protected String getInstanceBeanName(String instanceName) {
     return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, 
instanceName);
   }
 
@@ -890,7 +922,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param resourceName
    * @return resource bean name
    */
-  private String getResourceBeanName(String resourceName) {
+  protected String getResourceBeanName(String resourceName) {
     return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, 
resourceName);
   }
 
@@ -901,7 +933,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param resourceName
    * @return per-instance resource bean name
    */
-  public String getPerInstanceResourceBeanName(String instanceName, String 
resourceName) {
+  protected String getPerInstanceResourceBeanName(String instanceName, String 
resourceName) {
     return String.format("%s,%s", clusterBeanName(),
         new PerInstanceResourceMonitor.BeanName(instanceName, 
resourceName).toString());
   }
@@ -912,7 +944,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param workflowType The workflow type
    * @return per workflow type bean name
    */
-  public String getWorkflowBeanName(String workflowType) {
+  protected String getWorkflowBeanName(String workflowType) {
     return String.format("%s, %s=%s", clusterBeanName(), WORKFLOW_TYPE_DN_KEY, 
workflowType);
   }
 
@@ -922,7 +954,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
    * @param jobType The job type
    * @return per job type bean name
    */
-  public String getJobBeanName(String jobType) {
+  protected String getJobBeanName(String jobType) {
     return String.format("%s, %s=%s", clusterBeanName(), JOB_TYPE_DN_KEY, 
jobType);
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 65f5a4e..ed94763 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -61,6 +61,13 @@ public interface ClusterStatusMonitorMBean extends 
SensorNameProvider {
   long getInstanceMessageQueueBacklog();
 
   /**
+   * Total count of all messages that have not been completed
+   * after their expected completion time for instances in this cluster
+   * @return
+   */
+  long getTotalPastDueMessageGauge();
+
+  /**
    * @return 1 if cluster is enabled, otherwise 0
    */
   long getEnabled();
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index e0c0f89..86151e7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -48,7 +48,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     ENABLED_STATUS_GAUGE("Enabled"),
     ONLINE_STATUS_GAUGE("Online"),
     DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
-    MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
+    MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge"),
+    MESSAGE_QUEUE_SIZE_GAUGE("MessageQueueSizeGauge"),
+    PASTDUE_MESSAGE_GAUGE("PastDueMessageGauge");
 
     private final String metricName;
 
@@ -75,6 +77,8 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
   private SimpleDynamicMetric<Long> _onlineStatusGauge;
   private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
+  private SimpleDynamicMetric<Long> _messageQueueSizeGauge;
+  private SimpleDynamicMetric<Long> _pastDueMessageGauge;
 
   // A map of dynamic capacity Gauges. The map's keys could change.
   private final Map<String, SimpleDynamicMetric<Long>> 
_dynamicCapacityMetricsMap;
@@ -108,6 +112,12 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     _maxCapacityUsageGauge =
         new 
SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
             0.0d);
+    _messageQueueSizeGauge =
+        new 
SimpleDynamicMetric<>(InstanceMonitorMetric.MESSAGE_QUEUE_SIZE_GAUGE.metricName(),
+            0L);
+    _pastDueMessageGauge =
+        new 
SimpleDynamicMetric<>(InstanceMonitorMetric.PASTDUE_MESSAGE_GAUGE.metricName(),
+            0L);
   }
 
   private List<DynamicMetric<?, ?>> buildAttributeList() {
@@ -116,7 +126,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
         _disabledPartitionsGauge,
         _enabledStatusGauge,
         _onlineStatusGauge,
-        _maxCapacityUsageGauge
+        _maxCapacityUsageGauge,
+        _messageQueueSizeGauge,
+        _pastDueMessageGauge
     );
 
     attributeList.addAll(_dynamicCapacityMetricsMap.values());
@@ -146,6 +158,10 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     return _disabledPartitionsGauge.getValue();
   }
 
+  protected long getMessageQueueSizeGauge() { return 
_messageQueueSizeGauge.getValue(); }
+
+  protected long getPastDueMessageGauge() { return 
_pastDueMessageGauge.getValue(); }
+
   /**
    * Get the name of the monitored instance
    * @return instance name as a string
@@ -210,6 +226,22 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   }
 
   /**
+   * Updates message queue size for this instance.
+   * @param queueSize message queue size of this instance
+   */
+  public synchronized void updateMessageQueueSize(long queueSize) {
+    _messageQueueSizeGauge.updateValue(queueSize);
+  }
+
+  /**
+   * Updates number of messages that has not been completed after its expected 
completion time for this instance.
+   * @param msgCount count of messages that has not been completed after its 
due completion time
+   */
+  public synchronized void updatePastDueMessageGauge(long msgCount) {
+    _pastDueMessageGauge.updateValue(msgCount);
+  }
+
+  /**
    * Gets max capacity usage of this instance.
    * @return Max capacity usage of this instance.
    */
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index dee1f86..789992f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -29,6 +29,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 import javax.management.AttributeNotFoundException;
 import javax.management.InstanceNotFoundException;
 import javax.management.JMException;
@@ -56,6 +58,8 @@ import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
 
 public class TestClusterStatusMonitor {
   private static final MBeanServerConnection _server = 
ManagementFactory.getPlatformMBeanServer();
@@ -164,6 +168,84 @@ public class TestClusterStatusMonitor {
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
 
+  @Test()
+  public void testMessageMetrics() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    monitor.active();
+    ObjectName clusterMonitorObjName = 
monitor.getObjectName(monitor.clusterBeanName());
+
+    Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+    Map<String, Set<Message>> instanceMessageMap = Maps.newHashMap();
+    Set<String> liveInstanceSet = Sets.newHashSet();
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      liveInstanceSet.add(instanceName);
+
+      long now = System.currentTimeMillis();
+      Set<Message> messages = Sets.newHashSet();
+      // add 10 regular messages to each instance
+      for (int j = 0; j < 10; j++) {
+        Message m = new Message(Message.MessageType.STATE_TRANSITION, 
UUID.randomUUID().toString());
+        m.setTgtName(instanceName);
+        messages.add(m);
+      }
+
+      // add 10 past-due messages to each instance (using default completion 
period)
+      for (int j = 0; j < 10; j++) {
+        Message m = new Message(Message.MessageType.STATE_TRANSITION, 
UUID.randomUUID().toString());
+        m.setTgtName(instanceName);
+        m.setCreateTimeStamp(now - Message.MESSAGE_EXPECT_COMPLETION_PERIOD - 
1000);
+        messages.add(m);
+      }
+
+      // add other 5 past-due messages to each instance (using explicitly set 
COMPLETION time in message)
+      for (int j = 0; j < 5; j++) {
+        Message m = new Message(Message.MessageType.STATE_TRANSITION, 
UUID.randomUUID().toString());
+        m.setTgtName(instanceName);
+        m.setCompletionDueTimeStamp(now - 1000);
+        messages.add(m);
+      }
+      instanceMessageMap.put(instanceName, messages);
+    }
+
+    monitor.setClusterInstanceStatus(liveInstanceSet, liveInstanceSet, 
Collections.emptySet(),
+        Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), instanceMessageMap);
+
+    Assert.assertEquals(monitor.getInstanceMessageQueueBacklog(), 25 * n);
+    Assert.assertEquals(monitor.getTotalPastDueMessageGauge(), 15 * n);
+
+    Object totalMsgSize =
+        _server.getAttribute(clusterMonitorObjName, 
"InstanceMessageQueueBacklog");
+    Assert.assertTrue(totalMsgSize instanceof Long);
+    Assert.assertEquals((long) totalMsgSize, 25 * n);
+
+    Object totalPastdueMsgCount =
+        _server.getAttribute(clusterMonitorObjName, 
"TotalPastDueMessageGauge");
+    Assert.assertTrue(totalPastdueMsgCount instanceof Long);
+    Assert.assertEquals((long) totalPastdueMsgCount, 15 * n);
+
+    for (String instance : liveInstanceSet) {
+      ObjectName objName =
+          monitor.getObjectName(monitor.getInstanceBeanName(instance));
+      Object messageSize = _server.getAttribute(objName, 
"MessageQueueSizeGauge");
+      Assert.assertTrue(messageSize instanceof Long);
+      Assert.assertEquals((long) messageSize, 25L);
+
+      Object pastdueMsgCount = _server.getAttribute(objName, 
"PastDueMessageGauge");
+      Assert.assertTrue(pastdueMsgCount instanceof Long);
+      Assert.assertEquals((long) pastdueMsgCount, 15L);
+    }
+
+    System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+  }
 
   @Test
   public void testResourceAggregation() throws JMException, IOException {
@@ -352,7 +434,7 @@ public class TestClusterStatusMonitor {
     // Call setClusterInstanceStatus to register instance monitors.
     monitor.setClusterInstanceStatus(maxUsageMap.keySet(), 
maxUsageMap.keySet(),
         Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
-        Collections.emptyMap());
+        Collections.emptyMap(), Collections.emptyMap());
 
     // Update instance capacity status.
     for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
@@ -435,4 +517,31 @@ public class TestClusterStatusMonitor {
       }
     }
   }
+
+  private void verifyMessageMetrics(ClusterStatusMonitor monitor, Map<String, 
Double> maxUsageMap,
+      Map<String, Map<String, Integer>> instanceCapacityMap)
+      throws MalformedObjectNameException, IOException, 
AttributeNotFoundException, MBeanException,
+             ReflectionException, InstanceNotFoundException {
+    // Verify results.
+    for (Map.Entry<String, Map<String, Integer>> instanceEntry : 
instanceCapacityMap.entrySet()) {
+      String instance = instanceEntry.getKey();
+      Map<String, Integer> capacityMap = instanceEntry.getValue();
+      String instanceBeanName = String
+          .format("%s,%s=%s", monitor.clusterBeanName(), 
ClusterStatusMonitor.INSTANCE_DN_KEY,
+              instance);
+      ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+      Assert.assertTrue(_server.isRegistered(instanceObjectName));
+      Assert.assertEquals(_server.getAttribute(instanceObjectName,
+          
InstanceMonitor.InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName()),
+          maxUsageMap.get(instance));
+
+      for (Map.Entry<String, Integer> capacityEntry : capacityMap.entrySet()) {
+        String capacityKey = capacityEntry.getKey();
+        String attributeName = capacityKey + "Gauge";
+        Assert.assertEquals((long) _server.getAttribute(instanceObjectName, 
attributeName),
+            (long) instanceCapacityMap.get(instance).get(capacityKey));
+      }
+    }
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
index 609581b..7cac92e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
@@ -59,6 +59,8 @@ public class TestInstanceMonitor {
     monitor.updateMaxCapacityUsage(0.5d);
     monitor.increaseMessageCount(10L);
     monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), 
true, true);
+    monitor.updateMessageQueueSize(100L);
+    monitor.updatePastDueMessageGauge(50L);
 
     // Verify metrics.
     Assert.assertEquals(monitor.getTotalMessageReceived(), 10L);
@@ -69,6 +71,8 @@ public class TestInstanceMonitor {
     Assert.assertEquals(monitor.getEnabled(), 1L);
     Assert.assertEquals(monitor.getDisabledPartitions(), 2L);
     Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.5d);
+    Assert.assertEquals(monitor.getMessageQueueSizeGauge(), 100L);
+    Assert.assertEquals(monitor.getPastDueMessageGauge(), 50L);
 
     monitor.unregister();
   }

Reply via email to