HELIX-614: add gauge for failed workflow

Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/62752b2c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/62752b2c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/62752b2c

Branch: refs/heads/master
Commit: 62752b2cfc7841ca234ae5071964b65efe38569d
Parents: 79c96ba
Author: hrzhang <[email protected]>
Authored: Mon Nov 13 13:39:32 2017 -0800
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:31:26 2018 -0800

----------------------------------------------------------------------
 .../monitoring/mbeans/WorkflowMonitor.java      |  10 ++
 .../monitoring/mbeans/WorkflowMonitorMBean.java |   5 +
 .../helix/monitoring/TestWorkflowMonitor.java   | 148 +++++++++++++++++++
 3 files changed, 163 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/62752b2c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
index 5789677..00f75d4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -29,6 +29,7 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
 
   private long _successfulWorkflowCount;
   private long _failedWorkflowCount;
+  private long _failedWorkflowGauge;
   private long _existingWorkflowGauge;
   private long _queuedWorkflowGauge;
   private long _runningWorkflowGauge;
@@ -39,6 +40,7 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
     _workflowType = workflowType;
     _successfulWorkflowCount = 0L;
     _failedWorkflowCount = 0L;
+    _failedWorkflowGauge = 0L;
     _existingWorkflowGauge = 0L;
     _queuedWorkflowGauge = 0L;
     _runningWorkflowGauge = 0L;
@@ -55,6 +57,11 @@ public class WorkflowMonitor implements WorkflowMonitorMBean 
{
   }
 
   @Override
+  public long getFailedWorkflowGauge() {
+    return _failedWorkflowGauge;
+  }
+
+  @Override
   public long getExistingWorkflowGauge() {
     return _existingWorkflowGauge;
   }
@@ -93,6 +100,7 @@ public class WorkflowMonitor implements WorkflowMonitorMBean 
{
    * Reset gauges
    */
   public void resetGauges() {
+    _failedWorkflowGauge = 0L;
     _existingWorkflowGauge = 0L;
     _runningWorkflowGauge = 0L;
     _queuedWorkflowGauge = 0L;
@@ -107,6 +115,8 @@ public class WorkflowMonitor implements 
WorkflowMonitorMBean {
       _queuedWorkflowGauge++;
     } else if (current.equals(TaskState.IN_PROGRESS)) {
       _runningWorkflowGauge++;
+    } else if (current.equals(TaskState.FAILED)) {
+      _failedWorkflowGauge++;
     }
     _existingWorkflowGauge++;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/62752b2c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
index 8d7076c..dcd633d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
@@ -39,6 +39,11 @@ public interface WorkflowMonitorMBean extends 
SensorNameProvider {
   public long getFailedWorkflowCount();
 
   /**
+   * Get number of current failed workflows
+   */
+  public long getFailedWorkflowGauge();
+
+  /**
    * Get number of current existing workflows
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/62752b2c/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
new file mode 100644
index 0000000..978e574
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
@@ -0,0 +1,148 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.WorkflowMonitor;
+import org.apache.helix.task.TaskState;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestWorkflowMonitor {
+  private static final String TEST_CLUSTER_NAME = "TestCluster";
+  private static final String TEST_WORKFLOW_TYPE = "WorkflowTestType";
+  private static final String TEST_WORKFLOW_MBEAN_NAME = String
+      .format("%s=%s, %s=%s", "cluster", TEST_CLUSTER_NAME, "workflowType", 
TEST_WORKFLOW_TYPE);
+  private static final MBeanServer beanServer = 
ManagementFactory.getPlatformMBeanServer();
+
+  @Test
+  public void testRun() throws Exception {
+    WorkflowMonitor wm = new WorkflowMonitor(TEST_CLUSTER_NAME, 
TEST_WORKFLOW_TYPE);
+    registerMbean(wm, getObjectName());
+    Set<ObjectInstance> existingInstances = beanServer.queryMBeans(
+        new ObjectName(MonitorDomainNames.ClusterStatus.name() + ":" + 
TEST_WORKFLOW_MBEAN_NAME),
+        null);
+    HashSet<String> expectedAttr = new HashSet<>(Arrays
+        .asList("SuccessfulWorkflowCount", "FailedWorkflowCount", 
"FailedWorkflowGauge",
+            "ExistingWorkflowGauge", "QueuedWorkflowGauge", 
"RunningWorkflowGauge"));
+    for (ObjectInstance i : existingInstances) {
+      for (MBeanAttributeInfo info : 
beanServer.getMBeanInfo(i.getObjectName()).getAttributes()) {
+        expectedAttr.remove(info.getName());
+      }
+    }
+    Assert.assertTrue(expectedAttr.isEmpty());
+
+    int successfulWfCnt = 10;
+    int failedWfCnt = 10;
+    int queuedWfCnt = 10;
+    int runningWfCnt = 10;
+
+    for (int i = 0; i < successfulWfCnt; i++) {
+      wm.updateWorkflowCounters(TaskState.COMPLETED);
+      wm.updateWorkflowGauges(TaskState.COMPLETED);
+    }
+
+    for (int i = 0; i < failedWfCnt; i++) {
+      wm.updateWorkflowCounters(TaskState.FAILED);
+      wm.updateWorkflowGauges(TaskState.FAILED);
+    }
+
+    for (int i = 0; i < queuedWfCnt; i++) {
+      wm.updateWorkflowGauges(TaskState.NOT_STARTED);
+    }
+
+    for (int i = 0; i < runningWfCnt; i++) {
+      wm.updateWorkflowGauges(TaskState.IN_PROGRESS);
+    }
+
+    // Test gauges
+    Assert.assertEquals(wm.getExistingWorkflowGauge(),
+        successfulWfCnt + failedWfCnt + queuedWfCnt + runningWfCnt);
+    Assert.assertEquals(wm.getFailedWorkflowGauge(), failedWfCnt);
+    Assert.assertEquals(wm.getQueuedWorkflowGauge(), queuedWfCnt);
+    Assert.assertEquals(wm.getRunningWorkflowGauge(), runningWfCnt);
+
+    // Test counts
+    Assert.assertEquals(wm.getFailedWorkflowCount(), failedWfCnt);
+    Assert.assertEquals(wm.getSuccessfulWorkflowCount(), successfulWfCnt);
+
+    wm.resetGauges();
+
+    for (int i = 0; i < successfulWfCnt; i++) {
+      wm.updateWorkflowCounters(TaskState.COMPLETED);
+      wm.updateWorkflowGauges(TaskState.COMPLETED);
+    }
+
+    for (int i = 0; i < failedWfCnt; i++) {
+      wm.updateWorkflowCounters(TaskState.FAILED);
+      wm.updateWorkflowGauges(TaskState.FAILED);
+    }
+
+    for (int i = 0; i < queuedWfCnt; i++) {
+      wm.updateWorkflowGauges(TaskState.NOT_STARTED);
+    }
+
+    for (int i = 0; i < runningWfCnt; i++) {
+      wm.updateWorkflowGauges(TaskState.IN_PROGRESS);
+    }
+
+    // After reset, counters should be accumulative, but gauges should be reset
+    Assert.assertEquals(wm.getExistingWorkflowGauge(),
+        successfulWfCnt + failedWfCnt + queuedWfCnt + runningWfCnt);
+    Assert.assertEquals(wm.getFailedWorkflowGauge(), failedWfCnt);
+    Assert.assertEquals(wm.getQueuedWorkflowGauge(), queuedWfCnt);
+    Assert.assertEquals(wm.getRunningWorkflowGauge(), runningWfCnt);
+    Assert.assertEquals(wm.getFailedWorkflowCount(), failedWfCnt * 2);
+    Assert.assertEquals(wm.getSuccessfulWorkflowCount(), successfulWfCnt * 2);
+
+  }
+
+  private ObjectName getObjectName() throws MalformedObjectNameException {
+    return new ObjectName(
+        String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), 
TEST_WORKFLOW_MBEAN_NAME));
+  }
+
+  private void registerMbean(Object bean, ObjectName name) {
+    try {
+      if (beanServer.isRegistered(name)) {
+        beanServer.unregisterMBean(name);
+      }
+    } catch (Exception e) {
+      // OK
+    }
+
+    try {
+      System.out.println("Register MBean: " + name);
+      beanServer.registerMBean(bean, name);
+    } catch (Exception e) {
+      System.out.println("Could not register MBean: " + name + e.toString());
+    }
+  }
+
+}

Reply via email to