This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 557f255  [GOBBLIN-1452] Add meters for successful/failed dags in total 
and by flowGroup
557f255 is described below

commit 557f255d973255e50420f821b512f7c805552ae1
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Jun 1 16:16:16 2021 -0700

    [GOBBLIN-1452] Add meters for successful/failed dags in total and by 
flowGroup
    
    Closes #3290 from jack-moseley/flow-meters
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  2 ++
 .../service/modules/orchestration/DagManager.java  | 37 +++++++++++++++++++---
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 269e5bc..7e6bb55 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -40,6 +40,8 @@ public class ServiceMetricNames {
   public static final String CREATE_FLOW_METER = "CreateFlow";
   public static final String DELETE_FLOW_METER = "DeleteFlow";
   public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
+  public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
+  public static final String FAILED_FLOW_METER = "FailedFlows";
 
   public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
   public static final String SERVICE_USERS = "ServiceUsers";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 727c438..37632d7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -59,6 +59,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
@@ -431,6 +432,8 @@ public class DagManager extends AbstractIdleService {
     private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
     private static Map<String, FlowState> flowGauges = Maps.newHashMap();
+    private ContextAwareMeter allSuccessfulMeter;
+    private ContextAwareMeter allFailedMeter;
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -460,6 +463,10 @@ public class DagManager extends AbstractIdleService {
         this.jobStatusPolledTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
         ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
             () -> orchestrationDelay.get());
+        this.allSuccessfulMeter = metricContext.contextAwareMeter(
+            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+        this.allFailedMeter = metricContext.contextAwareMeter(
+            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
ServiceMetricNames.FAILED_FLOW_METER));
         this.metricContext.register(orchestrationDelayMetric);
       } else {
         this.metricContext = null;
@@ -1133,6 +1140,12 @@ public class DagManager extends AbstractIdleService {
 
       return counters;
     }
+
+    private ContextAwareMeter getGroupMeterForDag(String dagId, String 
meterName) {
+      String flowGroup = 
DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
+      return 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 flowGroup, meterName));
+    }
+
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
@@ -1140,7 +1153,6 @@ public class DagManager extends AbstractIdleService {
       List<String> dagIdstoClean = new ArrayList<>();
       //Clean up failed dags
       for (String dagId : this.failedDagIdsFinishRunning) {
-        addFailedDag(dagId);
         //Skip monitoring of any other jobs of the failed dag.
         LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
         while (!dagNodeList.isEmpty()) {
@@ -1148,7 +1160,7 @@ public class DagManager extends AbstractIdleService {
           deleteJobState(dagId, dagNode);
         }
         log.info("Dag {} has finished with status FAILED; Cleaning up dag from 
the state store.", dagId);
-        
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.FAILED);
+        onFlowFailure(dagId);
         // send an event before cleaning up dag
         DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
         dagIdstoClean.add(dagId);
@@ -1159,12 +1171,12 @@ public class DagManager extends AbstractIdleService {
         if (!hasRunningJobs(dagId) && 
!this.failedDagIdsFinishRunning.contains(dagId)) {
           String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
           if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            addFailedDag(dagId);
+            onFlowFailure(dagId);
             status = TimingEvent.FlowTimings.FLOW_FAILED;
             this.failedDagIdsFinishAllPossible.remove(dagId);
             
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.FAILED);
           } else {
-            
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.SUCCESSFUL);
+            onFlowSuccess(dagId);
           }
           log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, status);
           // send an event before cleaning up dag
@@ -1178,6 +1190,23 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    private void onFlowSuccess(String dagId) {
+      if (this.metricContext != null) {
+        
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.SUCCESSFUL);
+        this.allSuccessfulMeter.mark();
+        getGroupMeterForDag(dagId, 
ServiceMetricNames.SUCCESSFUL_FLOW_METER).mark();
+      }
+    }
+
+    private void onFlowFailure(String dagId) {
+      addFailedDag(dagId);
+      if (this.metricContext != null) {
+        
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.FAILED);
+        this.allFailedMeter.mark();
+        getGroupMeterForDag(dagId, 
ServiceMetricNames.FAILED_FLOW_METER).mark();
+      }
+    }
+
     /**
      * Add a dag to failed dag state store
      */

Reply via email to