This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f11d68f  [GOBBLIN-1421] Add running status gauge in DagManager
f11d68f is described below

commit f11d68ff07bb64f29024cb09ec2b7bf217bc0ba5
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Apr 19 11:55:04 2021 -0700

    [GOBBLIN-1421] Add running status gauge in DagManager
    
    Closes #3257 from jack-moseley/status-gauge
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  1 +
 .../service/modules/orchestration/DagManager.java  | 27 ++++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 89d62a4..269e5bc 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -44,6 +44,7 @@ public class ServiceMetricNames {
   public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
   public static final String SERVICE_USERS = "ServiceUsers";
   public static final String COMPILED = "Compiled";
+  public static final String RUNNING_STATUS = "RunningStatus";
 
   public static final String HELIX_LEADER_STATE = "HelixLeaderState";
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2b8872e..195d9be 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -59,6 +59,7 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -427,6 +428,7 @@ public class DagManager extends AbstractIdleService {
     private final int defaultQuota;
     private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
+    private static Map<String, FlowState> flowGauges = Maps.newHashMap();
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -650,6 +652,15 @@ public class DagManager extends AbstractIdleService {
         }
       }
 
+      String flowId = DagManagerUtils.getFlowId(dag).toString();
+      if (!flowGauges.containsKey(flowId)) {
+        String flowStateGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId, 
ServiceMetricNames.RUNNING_STATUS);
+        flowGauges.put(flowId, FlowState.RUNNING);
+        ContextAwareGauge<Integer> gauge = RootMetricContext
+            .get().newContextAwareGauge(flowStateGaugeName, () -> 
flowGauges.get(flowId).value);
+        RootMetricContext.get().register(flowStateGaugeName, gauge);
+      }
+
       log.debug("Dag {} submitting jobs ready for execution.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
       //Determine the next set of jobs to run and submit them for execution
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = 
submitNext(dagId);
@@ -1136,6 +1147,7 @@ public class DagManager extends AbstractIdleService {
           deleteJobState(dagId, dagNode);
         }
         log.info("Dag {} has finished with status FAILED; Cleaning up dag from 
the state store.", dagId);
+        
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.FAILED);
         // send an event before cleaning up dag
         DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
         dagIdstoClean.add(dagId);
@@ -1149,6 +1161,9 @@ public class DagManager extends AbstractIdleService {
             addFailedDag(dagId);
             status = TimingEvent.FlowTimings.FLOW_FAILED;
             this.failedDagIdsFinishAllPossible.remove(dagId);
+            
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.FAILED);
+          } else {
+            
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.SUCCESSFUL);
           }
           log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, status);
           // send an event before cleaning up dag
@@ -1192,6 +1207,18 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  private enum FlowState {
+    FAILED(-1),
+    RUNNING(0),
+    SUCCESSFUL(1);
+
+    public int value;
+
+    FlowState(int value) {
+      this.value = value;
+    }
+  }
+
   /**
    * Thread that runs retention on failed dags based on their original start 
time (which is the flow execution ID).
    */

Reply via email to