This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 50fdda9397 Implement dagProcessingEngine metrics (#3983)
50fdda9397 is described below

commit 50fdda93978d8dbc09485e5d5706f7d725bde6ef
Author: umustafi <[email protected]>
AuthorDate: Fri Jul 19 12:49:33 2024 -0700

    Implement dagProcessingEngine metrics (#3983)
    
    Implement dagProcessingEngine metrics
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  23 +++
 .../runtime/DagActionStoreChangeMonitorTest.java   |   3 +-
 ...gManagementDagActionStoreChangeMonitorTest.java |   4 +-
 .../modules/core/GobblinServiceGuiceModule.java    |   3 +
 .../orchestration/DagManagementTaskStreamImpl.java |  24 ++-
 .../modules/orchestration/DagProcessingEngine.java |  17 +-
 .../MySqlDagManagementStateStore.java              |   3 +-
 .../modules/orchestration/MysqlDagActionStore.java |   6 +-
 .../modules/orchestration/proc/DagProc.java        |  23 ++-
 .../proc/DeadlineEnforcementDagProc.java           |  18 ++-
 .../proc/EnforceFlowFinishDeadlineDagProc.java     |   7 +-
 .../proc/EnforceJobStartDeadlineDagProc.java       |   9 +-
 .../modules/orchestration/proc/KillDagProc.java    |  12 +-
 .../modules/orchestration/proc/LaunchDagProc.java  |   8 +-
 .../orchestration/proc/ReevaluateDagProc.java      |  13 +-
 .../modules/orchestration/proc/ResumeDagProc.java  |  10 +-
 .../task/DagProcessingEngineMetrics.java           | 179 +++++++++++++++++++++
 .../modules/orchestration/task/DagTask.java        |   9 +-
 .../task/EnforceFlowFinishDeadlineDagTask.java     |   4 +-
 .../task/EnforceJobStartDeadlineDagTask.java       |   4 +-
 .../modules/orchestration/task/KillDagTask.java    |   4 +-
 .../modules/orchestration/task/LaunchDagTask.java  |   4 +-
 .../orchestration/task/ReevaluateDagTask.java      |   4 +-
 .../modules/orchestration/task/ResumeDagTask.java  |   4 +-
 .../monitoring/DagActionStoreChangeMonitor.java    |  10 +-
 .../DagActionStoreChangeMonitorFactory.java        |   8 +-
 .../DagManagementDagActionStoreChangeMonitor.java  |   8 +-
 ...nagementDagActionStoreChangeMonitorFactory.java |   7 +-
 .../DagManagementTaskStreamImplTest.java           |   5 +-
 .../modules/orchestration/DagManagerFlowTest.java  |   3 +-
 .../orchestration/DagProcessingEngineTest.java     |  16 +-
 .../orchestration/MysqlDagActionStoreTest.java     |   4 +-
 .../proc/EnforceDeadlineDagProcsTest.java          |  18 ++-
 .../orchestration/proc/KillDagProcTest.java        |  19 ++-
 .../orchestration/proc/LaunchDagProcTest.java      |  13 +-
 .../orchestration/proc/ReevaluateDagProcTest.java  |  22 ++-
 .../orchestration/proc/ResumeDagProcTest.java      |   7 +-
 .../orchestration/task/LaunchDagTaskTest.java      |   3 +-
 38 files changed, 425 insertions(+), 113 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 3af11c1f10..e2acdc595b 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -80,4 +80,27 @@ public class ServiceMetricNames {
   public static final String DAG_COUNT_FS_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
 
   public static final String DAG_PROCESSING_EXCEPTION_METER = 
"DagProcessingException";
+  /* DagProcessingEngine & Multi-active Execution Related Metrics
+  * Note: metrics ending with the delimiter '.' will be suffixed by the 
specific {@link DagActionType} type for finer
+  * grained monitoring of each dagAction type in addition to the aggregation 
of all types.
+   */
+  public static final String DAG_PROCESSING_ENGINE_PREFIX = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "DagProcEngine.";
+  public static final String DAG_ACTIONS_STORED = DAG_PROCESSING_ENGINE_PREFIX 
+ "dagActionsStored.";
+  public static final String DAG_ACTIONS_OBSERVED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsObserved.";
+  public static final String DAG_ACTIONS_LEASES_OBTAINED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsLeasesObtained.";
+  public static final String DAG_ACTIONS_NO_LONGER_LEASING = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsNoLongerLeasing.";
+  public static final String DAG_ACTIONS_LEASE_REMINDER_SCHEDULED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsLeaseReminderScheduled.";
+  public static final String DAG_ACTIONS_REMINDER_PROCESSED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsRemindersProcessed.";
+  // TODO: implement dropping reminder event after exceed some time
+  public static final String DAG_ACTIONS_EXCEEDED_MAX_RETRY = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsExceededMaxRetry.";
+  public static final String DAG_ACTIONS_INITIALIZE_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitializeFailed.";
+  public static final String DAG_ACTIONS_INITIALIZE_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitializeSucceeded.";
+  public static final String DAG_ACTIONS_ACT_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed.";
+  public static final String DAG_ACTIONS_ACT_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded.";
+  public static final String DAG_ACTIONS_CONCLUDE_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed.";
+  public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded.";
+  public static final String DAG_ACTIONS_DELETE_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded.";
+  public static final String DAG_ACTIONS_DELETE_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed.";
+  // TODO: implement this one
+  public static final String DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsAvgProcessingDelayMillis.";
 }
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index a5de624ced..a246a8473e 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
 
 import java.net.URI;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -90,7 +91,7 @@ public class DagActionStoreChangeMonitorTest {
     public MockDagActionStoreChangeMonitor(String topic, Config config, int 
numThreads, boolean isMultiActiveSchedulerEnabled,
         DagManagementStateStore dagManagementStateStore, DagManager 
dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
       super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
-          dagManagementStateStore, isMultiActiveSchedulerEnabled);
+          dagManagementStateStore, isMultiActiveSchedulerEnabled, 
mock(DagProcessingEngineMetrics.class));
     }
 
     protected void processMessageForTest(DecodeableKafkaRecord record) {
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
index 3dbf0a9612..261e686bd2 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.quartz.SchedulerException;
 import org.testng.annotations.BeforeClass;
@@ -75,7 +76,8 @@ public class DagManagementDagActionStoreChangeMonitorTest {
 
     public MockDagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads, boolean isMultiActiveSchedulerEnabled) {
       super(config, numThreads, mock(FlowCatalog.class), 
mock(Orchestrator.class), mock(DagManagementStateStore.class),
-          isMultiActiveSchedulerEnabled, mock(DagManagement.class), 
dagActionReminderScheduler);
+          isMultiActiveSchedulerEnabled, mock(DagManagement.class), 
dagActionReminderScheduler,
+          mock(DagProcessingEngineMetrics.class));
     }
     protected void processMessageForTest(DecodeableKafkaRecord<String, 
DagActionStoreChangeEvent> record) {
       super.processMessage(record);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 6ed53ba00e..508dcb6d8e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -82,6 +82,7 @@ import 
org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
@@ -187,6 +188,8 @@ public class GobblinServiceGuiceModule implements Module {
       
binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
     }
 
+    binder.bind(DagProcessingEngineMetrics.class);
+
     /* Note that two instances of the same class can only be differentiated 
with an `annotatedWith` marker provided at
     binding time (optionally bound classes cannot have names associated with 
them), so both arbiters need to be
     explicitly bound to be differentiated. The scheduler lease arbiter is only 
used in single-active scheduler mode,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index c2507c5ebd..18672a99f1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.quartz.SchedulerException;
 
 import com.google.inject.Inject;
@@ -82,13 +83,14 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
   private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue = 
new LinkedBlockingQueue<>();
   private final DagManagementStateStore dagManagementStateStore;
+  private final DagProcessingEngineMetrics dagProcEngineMetrics;
 
   @Inject
   public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore,
       @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME) 
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
       Optional<DagActionReminderScheduler> dagActionReminderScheduler,
       @Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean 
isMultiActiveExecutionEnabled,
-      DagManagementStateStore dagManagementStateStore) {
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
     this.config = config;
     if (!dagActionStore.isPresent()) {
       /* DagActionStore is optional because there are other configurations 
that do not require it and it's initialized
@@ -105,6 +107,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
     this.dagManagementStateStore = dagManagementStateStore;
+    this.dagProcEngineMetrics = dagProcEngineMetrics;
   }
 
   public synchronized void addDagAction(DagActionStore.LeaseParams 
leaseParams) {
@@ -136,6 +139,10 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
           } else { // Handle original non-deadline dagActions as well as 
reminder events of all types
             LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(leaseParams);
             if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
+              
this.dagProcEngineMetrics.markDagActionsLeasedObtained(leaseParams);
+              if (leaseParams.isReminder()) {
+                
this.dagProcEngineMetrics.markDagActionsRemindersProcessed(leaseParams);
+              }
               return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
             }
           }
@@ -198,6 +205,9 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
         */
     if (!(leaseAttemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus)) {
       scheduleReminderForEvent(leaseAttemptStatus);
+      
this.dagProcEngineMetrics.markDagActionsLeaseReminderScheduled(leaseParams);
+    } else {
+      this.dagProcEngineMetrics.markDagActionsNoLongerLeasing(leaseParams);
     }
     return leaseAttemptStatus;
   }
@@ -207,17 +217,17 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
 
     switch (dagActionType) {
       case ENFORCE_FLOW_FINISH_DEADLINE:
-        return new EnforceFlowFinishDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagManagementStateStore);
+        return new EnforceFlowFinishDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagManagementStateStore, dagProcEngineMetrics);
       case ENFORCE_JOB_START_DEADLINE:
-        return new EnforceJobStartDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagManagementStateStore);
+        return new EnforceJobStartDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagManagementStateStore, dagProcEngineMetrics);
       case KILL:
-        return new KillDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
+        return new KillDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore, dagProcEngineMetrics);
       case LAUNCH:
-        return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
+        return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore, dagProcEngineMetrics);
       case REEVALUATE:
-        return new ReevaluateDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
+        return new ReevaluateDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore, dagProcEngineMetrics);
       case RESUME:
-        return new ResumeDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
+        return new ResumeDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore, dagProcEngineMetrics);
       default:
         throw new UnsupportedOperationException(dagActionType + " not yet 
implemented");
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 449b9cf994..813c81b3d9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
@@ -61,13 +62,15 @@ public class DagProcessingEngine extends 
AbstractIdleService {
   private final Config config;
   private final Optional<DagProcFactory> dagProcFactory;
   private ScheduledExecutorService scheduledExecutorPool;
+  private final DagProcessingEngineMetrics dagProcEngineMetrics;
   private static final Integer TERMINATION_TIMEOUT = 30;
   public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = 
"defaultJobStartDeadlineTimeMillis";
   @Getter static long defaultJobStartSlaTimeMillis;
 
   @Inject
   public DagProcessingEngine(Config config, Optional<DagTaskStream> 
dagTaskStream, Optional<DagProcFactory> dagProcFactory,
-      Optional<DagManagementStateStore> dagManagementStateStore, 
@Named(DEFAULT_JOB_START_DEADLINE_TIME_MS) long deadlineTimeMs) {
+      Optional<DagManagementStateStore> dagManagementStateStore,
+      @Named(DEFAULT_JOB_START_DEADLINE_TIME_MS) long deadlineTimeMs, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
     this.config = config;
     this.dagProcFactory = dagProcFactory;
     this.dagTaskStream = dagTaskStream;
@@ -79,6 +82,7 @@ public class DagProcessingEngine extends AbstractIdleService {
           this.dagProcFactory.isPresent() ? "present" : "MISSING",
           this.dagManagementStateStore.isPresent() ? "present" : "MISSING"));
     }
+    this.dagProcEngineMetrics = dagProcEngineMetrics;
     log.info("DagProcessingEngine initialized.");
     setDefaultJobStartDeadlineTimeMs(deadlineTimeMs);
   }
@@ -98,7 +102,7 @@ public class DagProcessingEngine extends AbstractIdleService 
{
     for (int i=0; i < numThreads; i++) {
       // todo - set metrics for count of active DagProcEngineThread
       DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream.get(), dagProcFactory.get(),
-          dagManagementStateStore.get(), i);
+          dagManagementStateStore.get(), dagProcEngineMetrics, i);
       this.scheduledExecutorPool.submit(dagProcEngineThread);
     }
   }
@@ -114,9 +118,10 @@ public class DagProcessingEngine extends 
AbstractIdleService {
   @AllArgsConstructor
   @VisibleForTesting
   static class DagProcEngineThread implements Runnable {
-    private DagTaskStream dagTaskStream;
-    private DagProcFactory dagProcFactory;
-    private DagManagementStateStore dagManagementStateStore;
+    private final DagTaskStream dagTaskStream;
+    private final DagProcFactory dagProcFactory;
+    private final DagManagementStateStore dagManagementStateStore;
+    private final DagProcessingEngineMetrics dagProcEngineMetrics;
     private final int threadID;
 
     @Override
@@ -131,7 +136,7 @@ public class DagProcessingEngine extends 
AbstractIdleService {
         }
         DagProc<?> dagProc = dagTask.host(dagProcFactory);
         try {
-          dagProc.process(dagManagementStateStore);
+          dagProc.process(dagManagementStateStore, dagProcEngineMetrics);
           dagTask.conclude();
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index 285700b845..c0984f835b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -183,8 +183,7 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
 
   @Override
   public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId 
dagId) throws IOException {
-    return this.dagStateStore.getDagNodes(dagId);
-  }
+    return this.dagStateStore.getDagNodes(dagId);}
 
   @Override
   public void tryAcquireQuota(Collection<Dag.DagNode<JobExecutionPlan>> 
dagNodes) throws IOException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index c96478c79d..f5b8aa277a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -36,6 +36,7 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.DBStatementExecutor;
 
@@ -63,9 +64,10 @@ public class MysqlDagActionStore implements DagActionStore {
 
   // Deletes rows older than retention time period (in seconds) to prevent 
this table from growing unbounded.
   private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE 
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %s SECOND)";
+  private final DagProcessingEngineMetrics dagProcessingEngineMetrics;
 
   @Inject
-  public MysqlDagActionStore(Config config) throws IOException {
+  public MysqlDagActionStore(Config config, DagProcessingEngineMetrics 
dagProcessingEngineMetrics) throws IOException {
     if (config.hasPath(CONFIG_PREFIX)) {
       config = config.getConfig(CONFIG_PREFIX).withFallback(config);
     } else {
@@ -89,6 +91,7 @@ public class MysqlDagActionStore implements DagActionStore {
     String thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
     // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
     
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6L, TimeUnit.HOURS);
+    this.dagProcessingEngineMetrics = dagProcessingEngineMetrics;
   }
 
   @Override
@@ -121,6 +124,7 @@ public class MysqlDagActionStore implements DagActionStore {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType), tableName), e);
     }}, true);
+    this.dagProcessingEngineMetrics.markDagActionsStored(dagActionType);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index effe61c360..3c8aadc6eb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -29,9 +29,11 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 
 
@@ -63,13 +65,26 @@ public abstract class DagProc<T> {
     this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
   }
 
-  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
-    T state = initialize(dagManagementStateStore);
-    act(dagManagementStateStore, state);
+  public final void process(DagManagementStateStore dagManagementStateStore,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
+    T state;
+    try {
+      state = initialize(dagManagementStateStore);
+      dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), true);
+    } catch (Exception e) {
+      dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
+      throw e;
+    }
+    act(dagManagementStateStore, state, dagProcEngineMetrics);
     log.info("{} concluded processing for dagId : {}", 
getClass().getSimpleName(), this.dagId);
   }
 
   protected abstract T initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
 
-  protected abstract void act(DagManagementStateStore dagManagementStateStore, 
T state) throws IOException;
+  protected abstract void act(DagManagementStateStore dagManagementStateStore, 
T state,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException;
+
+  public DagActionStore.DagActionType getDagActionType() {
+    return this.dagTask.getDagAction().getDagActionType();
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
index 032a26df6d..0d3369a3fd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
@@ -45,19 +46,21 @@ abstract public class DeadlineEnforcementDagProc extends 
DagProc<Optional<Dag<Jo
   }
 
   @Override
-  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
-      throws IOException {
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     if (validate(dag, dagManagementStateStore)) {
-      enforceDeadline(dagManagementStateStore, dag.get());
-    }
+      enforceDeadline(dagManagementStateStore, dag.get(), 
dagProcEngineMetrics);
+    } else {
+    dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
+  }
   }
 
-  private boolean validate(Optional<Dag<JobExecutionPlan>> dag, 
DagManagementStateStore dagManagementStateStore) throws IOException {
+  protected boolean validate(Optional<Dag<JobExecutionPlan>> dag, 
DagManagementStateStore dagManagementStateStore)
+      throws IOException {
     log.info("Request to enforce deadlines for dag {}", getDagId());
     DagActionStore.DagAction dagAction = getDagTask().getDagAction();
 
     if (!dag.isPresent()) {
-      // todo - add a metric here
       log.error("Dag not present when validating {}. It may already have 
cancelled/finished. Dag {}",
           getDagId(), dagAction);
       return false;
@@ -72,5 +75,6 @@ abstract public class DeadlineEnforcementDagProc extends 
DagProc<Optional<Dag<Jo
     return true;
   }
 
-  abstract void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag) throws IOException;
+  abstract void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException;
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index e5aa4125dc..b9b5d179e9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -26,6 +26,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
@@ -41,8 +42,8 @@ public class EnforceFlowFinishDeadlineDagProc extends 
DeadlineEnforcementDagProc
     super(enforceFlowFinishDeadlineDagTask);
   }
 
-  protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag)
-      throws IOException {
+  protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
     long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
     long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
@@ -58,7 +59,9 @@ public class EnforceFlowFinishDeadlineDagProc extends 
DeadlineEnforcementDagProc
 
       dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
       dag.setMessage("Flow killed due to exceeding SLA of " + 
flowFinishDeadline + " ms");
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
     } else {
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
       log.error("EnforceFlowFinishDeadline dagAction received before due time. 
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 5eb2236f8f..6eccc1ab1a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -30,6 +30,7 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
@@ -48,13 +49,13 @@ public class EnforceJobStartDeadlineDagProc extends 
DeadlineEnforcementDagProc {
     super(enforceJobStartDeadlineDagTask);
   }
 
-  protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag)
-      throws IOException {
+  protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
         dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
     if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
       // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store
-      // todo - add metrics
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
       log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc", 
getDagNodeId());
       return;
     }
@@ -63,6 +64,7 @@ public class EnforceJobStartDeadlineDagProc extends 
DeadlineEnforcementDagProc {
     long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
     Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus = 
dagNodeToCheckDeadline.getRight();
     if (!jobStatus.isPresent()) {
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
       log.error("Some job status should be present for dag node {} that this 
EnforceJobStartDeadlineDagProc belongs.", getDagNodeId());
       return;
     }
@@ -78,5 +80,6 @@ public class EnforceJobStartDeadlineDagProc extends 
DeadlineEnforcementDagProc {
       dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
       dag.setMessage("Flow killed because no update received for " + 
timeOutForJobStart + " ms after orchestration");
     }
+    dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index ab68d3e7b1..c429d754ab 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -26,6 +26,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
@@ -46,16 +47,16 @@ public class KillDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-   return dagManagementStateStore.getDag(getDagId());
+      return dagManagementStateStore.getDag(getDagId());
   }
 
   @Override
-  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
-      throws IOException {
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     log.info("Request to kill dag {} (node: {})", getDagId(), 
shouldKillSpecificJob ? getDagNodeId() : "<<all>>");
 
     if (!dag.isPresent()) {
-      // todo - add a metric here
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
       log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.", getDagId());
       return;
     }
@@ -68,11 +69,12 @@ public class KillDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       if (dagNodeToCancel.isPresent()) {
         DagProcUtils.cancelDagNode(dagNodeToCancel.get(), 
dagManagementStateStore);
       } else {
-        // todo - add a metric here
+        dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
         log.error("Did not find Dag node with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.", getDagNodeId());
       }
     } else {
       DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
     }
+    dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 88f3249f1c..c72c1fcf96 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -28,6 +28,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@@ -66,14 +67,15 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
   }
 
   @Override
-  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
-      throws IOException {
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     if (!dag.isPresent()) {
       log.warn("Dag with id " + getDagId() + " could not be compiled.");
-      // todo - add metrics
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
     } else {
       DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), 
getDagId());
       
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
     }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 27525c057c..f4e02b8475 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -29,6 +29,7 @@ import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -48,19 +49,19 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
   }
 
   @Override
-  protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
initialize(DagManagementStateStore dagManagementStateStore)
-      throws IOException {
-    return dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
+  protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
initialize(
+      DagManagementStateStore dagManagementStateStore) throws IOException {
+      return dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
   }
 
   @Override
   protected void act(DagManagementStateStore dagManagementStateStore, 
Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
-      Optional<JobStatus>> dagNodeWithJobStatus) throws IOException {
+      Optional<JobStatus>> dagNodeWithJobStatus, DagProcessingEngineMetrics 
dagProcEngineMetrics) throws IOException {
     if (!dagNodeWithJobStatus.getLeft().isPresent()) {
       // one of the reason this could arise is when the MALA leasing doesn't 
work cleanly and another DagProc::process
       // has cleaned up the Dag, yet did not complete the lease before this 
current one acquired its own
       log.error("DagNode or its job status not found for a Reevaluate 
DagAction with dag node id {}", this.dagNodeId);
-      // todo - add metrics to count such occurrences
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
       return;
     }
 
@@ -72,6 +73,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
       // dag actions for each of those parallel job and in this scenario there 
is no job status available.
       // If the job status is not present, this job was never launched, submit 
it now.
       DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
       return;
     }
 
@@ -118,6 +120,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
 
       DagProcUtils.removeFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagId());
     }
+    dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 3447035cc4..8a94c4ef12 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -31,6 +31,7 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
@@ -52,16 +53,16 @@ public class ResumeDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-   return dagManagementStateStore.getFailedDag(getDagId());
+      return dagManagementStateStore.getFailedDag(getDagId());
   }
 
   @Override
-  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> failedDag)
-      throws IOException {
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> failedDag,
+      DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     log.info("Request to resume dag {}", getDagId());
 
     if (!failedDag.isPresent()) {
-      // todo - add a metric here
+      dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
       log.error("Dag " + dagId + " was not found in dag state store");
       return;
     }
@@ -95,5 +96,6 @@ public class ResumeDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
     DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(), 
getDagId());
 
     
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
+    dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
new file mode 100644
index 0000000000..1011eae473
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricTagNames;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+/**
+ * Used to track all metrics relating to processing dagActions when 
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by 
type) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the 
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl -> 
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
+@Slf4j
+public class DagProcessingEngineMetrics {
+  MetricContext metricContext;
+  /*
+   Declare map of dagActionType to a ContextAwareMeter for each metric. 
ContextAwareMeters are thread safe, so it will
+   handle concurrent mark requests correctly. ConcurrentMap is not needed 
since no updates are made to the mappings,
+   only get calls.
+  */
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsStoredMeterByDagActionType = new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsObservedMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsLeasesObtainedMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsNoLongerLeasingMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsLeaseReminderScheduledMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsReminderProcessedMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsExceededMaxRetryMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsInitializeFailedMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsInitializeSucceededMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsActFailedMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsActSucceededMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsConcludeFailedMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsConcludeSucceededMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsRemovedFromStoreMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsFailingRemovalMeterByDagActionType =  new HashMap();
+  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionAverageProcessingDelayMillisMeterByDagActionType =  new HashMap();
+
+  public DagProcessingEngineMetrics(MetricContext metricContext) {
+    this.metricContext = metricContext;
+    registerAllMetrics();
+  }
+
+  @Inject
+  public DagProcessingEngineMetrics() {
+    // Create a new metric context for the DagProcessingEngineMetrics tagged 
appropriately
+    List<Tag<?>> tags = new ArrayList<>();
+    tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, 
GobblinMetrics.MetricType.COUNTER));
+    this.metricContext = Instrumented.getMetricContext(new State(), 
this.getClass(), tags);
+  }
+
+  public void registerAllMetrics() {
+    
registerMetricForEachDagActionType(this.dagActionsStoredMeterByDagActionType, 
ServiceMetricNames.DAG_ACTIONS_STORED);
+    
registerMetricForEachDagActionType(this.dagActionsObservedMeterByDagActionType, 
ServiceMetricNames.DAG_ACTIONS_OBSERVED);
+    
registerMetricForEachDagActionType(this.dagActionsLeasesObtainedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_LEASES_OBTAINED);
+    
registerMetricForEachDagActionType(this.dagActionsNoLongerLeasingMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING);
+    
registerMetricForEachDagActionType(this.dagActionsLeaseReminderScheduledMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_LEASE_REMINDER_SCHEDULED);
+    
registerMetricForEachDagActionType(this.dagActionsReminderProcessedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_REMINDER_PROCESSED);
+    
registerMetricForEachDagActionType(this.dagActionsExceededMaxRetryMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_EXCEEDED_MAX_RETRY);
+    
registerMetricForEachDagActionType(this.dagActionsInitializeFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_INITIALIZE_FAILED);
+    
registerMetricForEachDagActionType(this.dagActionsInitializeSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_INITIALIZE_SUCCEEDED);
+    
registerMetricForEachDagActionType(this.dagActionsActFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_ACT_FAILED);
+    
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
+    
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
+    
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
+  }
+
+  /**
+   * Create a meter of each dagActionType for the given metric, register it 
with the metric context, and store it in a
+   * concurrent map.
+   * @param metricMap
+   * @param metricName
+   */
+  private void 
registerMetricForEachDagActionType(HashMap<DagActionStore.DagActionType, 
ContextAwareMeter> metricMap, String metricName) {
+    for (DagActionStore.DagActionType dagActionType : 
DagActionStore.DagActionType.values()) {
+      metricMap.put(dagActionType, 
this.metricContext.contextAwareMeter(metricName + dagActionType));
+    }
+  }
+
+  public void markDagActionsStored(DagActionStore.DagActionType dagActionType) 
{
+    updateMetricForDagActionType(this.dagActionsStoredMeterByDagActionType, 
dagActionType);
+  }
+
+  public void markDagActionsObserved(DagActionStore.DagActionType 
dagActionType) {
+    updateMetricForDagActionType(this.dagActionsObservedMeterByDagActionType, 
dagActionType);
+  }
+
+  public void markDagActionsLeasedObtained(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsLeasesObtainedMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  public void markDagActionsNoLongerLeasing(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsNoLongerLeasingMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  public void markDagActionsLeaseReminderScheduled(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsLeaseReminderScheduledMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  public void markDagActionsRemindersProcessed(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsReminderProcessedMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  // TODO: implement evaluating max retries later
+  public void markDagActionsExceedingMaxRetry(DagActionStore.DagActionType 
dagActionType) {
+    
updateMetricForDagActionType(this.dagActionsExceededMaxRetryMeterByDagActionType,
 dagActionType);
+  }
+
+  public void markDagActionsInitialize(DagActionStore.DagActionType 
dagActionType, boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsInitializeSucceededMeterByDagActionType,
 dagActionType);
+    } else {
+      
updateMetricForDagActionType(this.dagActionsInitializeFailedMeterByDagActionType,
 dagActionType);
+    }
+  }
+
+  public void markDagActionsAct(DagActionStore.DagActionType dagActionType, 
boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsActSucceededMeterByDagActionType, 
dagActionType);
+    } else {
+      
updateMetricForDagActionType(this.dagActionsActFailedMeterByDagActionType, 
dagActionType);
+    }
+  }
+
+  public void markDagActionsConclude(DagActionStore.DagActionType 
dagActionType, boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
 dagActionType);
+    } else {
+      
updateMetricForDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, 
dagActionType);
+    }
+  }
+
+
+  /**
+   * Generic helper used to increment a metric corresponding to the 
dagActionType in the provided map. It assumes the
+   * meter for each dagActionType can be identified by its name.
+   */
+  private void 
updateMetricForDagActionType(HashMap<DagActionStore.DagActionType, 
ContextAwareMeter> metricMap,
+      DagActionStore.DagActionType dagActionType) {
+      if (metricMap.containsKey(dagActionType)) {
+        metricMap.get(dagActionType).mark();
+      } else {
+        throw new RuntimeException(String.format("No meter exists for 
dagActionType %s in metricsMap %s",
+            dagActionType, metricMap));
+      }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index df62eb971c..3fb6110e10 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -42,12 +42,14 @@ public abstract class DagTask {
   @Getter public final DagActionStore.DagAction dagAction;
   protected final DagManagementStateStore dagManagementStateStore;
   private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
+  private final DagProcessingEngineMetrics dagProcEngineMetrics;
 
   public DagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagManagementStateStore dagManagementStateStore) {
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
     this.dagAction = dagAction;
     this.leaseObtainedStatus = leaseObtainedStatus;
     this.dagManagementStateStore = dagManagementStateStore;
+    this.dagProcEngineMetrics = dagProcEngineMetrics;
   }
 
   public abstract <T> T host(DagTaskVisitor<T> visitor);
@@ -60,8 +62,11 @@ public abstract class DagTask {
   public boolean conclude() {
     try {
       this.dagManagementStateStore.deleteDagAction(this.dagAction);
-      return this.leaseObtainedStatus.completeLease();
+      boolean completedLease = this.leaseObtainedStatus.completeLease();
+      
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 true);
+      return completedLease;
     } catch (IOException e) {
+      
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 false);
       // TODO: Decide appropriate exception to throw and add to the commit 
method's signature
       throw new RuntimeException(e);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
index bfd709fb9a..466006d117 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
@@ -30,8 +30,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class EnforceFlowFinishDeadlineDagTask extends DagTask {
   public EnforceFlowFinishDeadlineDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagManagementStateStore dagManagementStateStore) {
-    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore, 
dagProcEngineMetrics);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
index c1c270eb7b..821ff6dfba 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
@@ -30,8 +30,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class EnforceJobStartDeadlineDagTask extends DagTask {
   public EnforceJobStartDeadlineDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagManagementStateStore dagManagementStateStore) {
-    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore, 
dagProcEngineMetrics);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
index 3d7315378c..c194f89f23 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
@@ -29,8 +29,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class KillDagTask extends DagTask {
   public KillDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagManagementStateStore dagManagementStateStore) {
-    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore, 
dagProcEngineMetrics);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 909763a0cd..eed110e07a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -36,8 +36,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 @Slf4j
 public class LaunchDagTask extends DagTask {
   public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagManagementStateStore dagManagementStateStore) {
-    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore, 
dagProcEngineMetrics);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
index ea1b36e341..bd4aaa42c9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
@@ -29,8 +29,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class ReevaluateDagTask extends DagTask {
   public ReevaluateDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagManagementStateStore dagManagementStateStore) {
-    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore, 
dagProcEngineMetrics);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
index 4e5ed2657d..6b481212ec 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
@@ -28,8 +28,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
  */
 public class ResumeDagTask extends DagTask {
   public ResumeDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagManagementStateStore dagManagementStateStore) {
-    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+      DagManagementStateStore dagManagementStateStore, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore, 
dagProcEngineMetrics);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index d9d0913bcd..bb95d2481f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -51,6 +51,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 
 
 /**
@@ -100,6 +101,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
   @VisibleForTesting
   protected FlowCatalog flowCatalog;
   protected DagManagementStateStore dagManagementStateStore;
+  protected final DagProcessingEngineMetrics dagProcEngineMetrics;
   @Getter
   private volatile boolean isActive;
 
@@ -107,7 +109,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagActionStoreChangeMonitor(String topic, Config config, DagManager 
dagManager, int numThreads,
       FlowCatalog flowCatalog, Orchestrator orchestrator, 
DagManagementStateStore dagManagementStateStore,
-      boolean isMultiActiveSchedulerEnabled) {
+      boolean isMultiActiveSchedulerEnabled, DagProcessingEngineMetrics 
dagProcEngineMetrics) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
@@ -117,6 +119,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
     this.orchestrator = orchestrator;
     this.dagManagementStateStore = dagManagementStateStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+    this.dagProcEngineMetrics = dagProcEngineMetrics;
 
     /*
     Metrics need to be created before initializeMonitor() below is called (or 
more specifically handleDagAction() is
@@ -231,14 +234,15 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
-  protected void handleDagAction(String operation, DagActionStore.DagAction 
dagAction, String flowGroup, String flowName,
-      long flowExecutionId, DagActionStore.DagActionType dagActionType) {
+  protected void handleDagAction(String operation, DagActionStore.DagAction 
dagAction, String flowGroup,
+      String flowName, long flowExecutionId, DagActionStore.DagActionType 
dagActionType) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
       switch (operation) {
         case "INSERT":
           handleDagAction(dagAction, false);
+          this.dagProcEngineMetrics.markDagActionsObserved(dagActionType);
           break;
         case "UPDATE":
           // TODO: change this warning message and process updates if for 
launch or reevaluate type
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 5bac15089b..b392560251 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -31,6 +31,7 @@ import org.apache.gobblin.runtime.util.InjectionNames;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -47,17 +48,20 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
   private Orchestrator orchestrator;
   private DagManagementStateStore dagManagementStateStore;
   private boolean isMultiActiveSchedulerEnabled;
+  private DagProcessingEngineMetrics dagProcEngineMetrics;
 
   @Inject
   public DagActionStoreChangeMonitorFactory(Config config, DagManager 
dagManager, FlowCatalog flowCatalog,
       Orchestrator orchestrator, DagManagementStateStore 
dagManagementStateStore,
-      @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
+      @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled,
+      DagProcessingEngineMetrics dagProcEngineMetrics) {
     this.config = Objects.requireNonNull(config);
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
     this.dagManagementStateStore = dagManagementStateStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+    this.dagProcEngineMetrics = dagProcEngineMetrics;
   }
 
   private DagActionStoreChangeMonitor createDagActionStoreMonitor() {
@@ -68,7 +72,7 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
     return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagManager, numThreads, flowCatalog,
-        orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled);
+        orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled, 
dagProcEngineMetrics);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index 35e4dafad8..7e2f87ed43 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -31,6 +31,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 
 
 /**
@@ -47,10 +48,12 @@ public class DagManagementDagActionStoreChangeMonitor 
extends DagActionStoreChan
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
       FlowCatalog flowCatalog, Orchestrator orchestrator, 
DagManagementStateStore dagManagementStateStore,
-      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement, 
DagActionReminderScheduler dagActionReminderScheduler) {
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement,
+      DagActionReminderScheduler dagActionReminderScheduler, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
     // DagManager is only needed in the `handleDagAction` method of its parent 
class and not needed in this class,
     // so we are passing a null value for DagManager to its parent class.
-    super("", config, null, numThreads, flowCatalog, orchestrator, 
dagManagementStateStore, isMultiActiveSchedulerEnabled);
+    super("", config, null, numThreads, flowCatalog, orchestrator, 
dagManagementStateStore,
+        isMultiActiveSchedulerEnabled, dagProcEngineMetrics);
     this.dagManagement = dagManagement;
     this.dagActionReminderScheduler = dagActionReminderScheduler;
   }
@@ -64,6 +67,7 @@ public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChan
       switch (operation) {
         case "INSERT":
           handleDagAction(dagAction, false);
+          this.dagProcEngineMetrics.markDagActionsObserved(dagActionType);
           break;
         case "UPDATE":
           log.warn("Received an UPDATE action to the DagActionStore when 
values in this store are never supposed to be "
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index a55286e17d..fc0d028f60 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -33,6 +33,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -50,12 +51,13 @@ public class 
DagManagementDagActionStoreChangeMonitorFactory implements Provider
   private final boolean isMultiActiveSchedulerEnabled;
   private final DagManagement dagManagement;
   private final DagActionReminderScheduler dagActionReminderScheduler;
+  private final DagProcessingEngineMetrics dagProcEngineMetrics;
 
   @Inject
   public DagManagementDagActionStoreChangeMonitorFactory(Config config, 
DagManager dagManager, FlowCatalog flowCatalog,
       Orchestrator orchestrator, DagManagementStateStore 
dagManagementStateStore, DagManagement dagManagement,
       @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled,
-      DagActionReminderScheduler dagActionReminderScheduler) {
+      DagActionReminderScheduler dagActionReminderScheduler, 
DagProcessingEngineMetrics dagProcEngineMetrics) {
     this.config = Objects.requireNonNull(config);
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
@@ -63,6 +65,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory 
implements Provider
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
     this.dagManagement = dagManagement;
     this.dagActionReminderScheduler = dagActionReminderScheduler;
+    this.dagProcEngineMetrics = dagProcEngineMetrics;
   }
 
   private DagManagementDagActionStoreChangeMonitor 
createDagActionStoreMonitor() {
@@ -73,7 +76,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory 
implements Provider
 
     return new 
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
         numThreads, flowCatalog, orchestrator, dagManagementStateStore, 
isMultiActiveSchedulerEnabled, this.dagManagement,
-        this.dagActionReminderScheduler);
+        this.dagActionReminderScheduler, dagProcEngineMetrics);
   }
 
   @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 5ce3b33789..af8acb7be9 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.io.IOException;
 import java.util.Optional;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -68,10 +69,10 @@ public class DagManagementTaskStreamImplTest {
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
             mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
-            false, mock(DagManagementStateStore.class));
+            false, mock(DagManagementStateStore.class), 
mock(DagProcessingEngineMetrics.class));
     this.dagProcFactory = new DagProcFactory(null);
     this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
-        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore, 0);
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore, mock(DagProcessingEngineMetrics.class), 0);
   }
 
   @AfterClass(alwaysRun = true)
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 342b379f4a..008dafa1ea 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -89,7 +90,7 @@ public class DagManagerFlowTest {
         .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
         .build();
 
-    dagActionStore = new MysqlDagActionStore(config);
+    dagActionStore = new MysqlDagActionStore(config, 
Mockito.mock(DagProcessingEngineMetrics.class));
     dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionType.KILL);
     dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId_2, 
DagActionStore.DagActionType.RESUME);
     dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props));
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index baf4bfabbb..a5a4f168ec 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -22,6 +22,7 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.TimeoutException;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -56,6 +57,7 @@ public class DagProcessingEngineTest {
   private DagProcFactory dagProcFactory;
   // Field is static because it's used to instantiate every MockedDagTask
   private static MySqlDagManagementStateStore dagManagementStateStore;
+  private static DagProcessingEngineMetrics mockedDagProcEngineMetrics;
   private ITestMetastoreDatabase testMetastoreDatabase;
   static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
 
@@ -81,18 +83,19 @@ public class DagProcessingEngineTest {
     doReturn(true).when(dagActionStore).deleteDagAction(any());
     dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
-            mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)), false,
-            dagManagementStateStore);
+            mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
+            false, dagManagementStateStore, 
Mockito.mock(DagProcessingEngineMetrics.class));
     this.dagProcFactory = new DagProcFactory(null);
 
     DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
         new DagProcessingEngine.DagProcEngineThread(dagManagementTaskStream, 
this.dagProcFactory,
-            dagManagementStateStore, 0);
+            dagManagementStateStore, mock(DagProcessingEngineMetrics.class), 
0);
     this.dagTaskStream = spy(new MockedDagTaskStream());
     DagProcessingEngine dagProcessingEngine =
         new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream), 
Optional.ofNullable(this.dagProcFactory),
-            Optional.ofNullable(dagManagementStateStore), 100000L);
+            Optional.ofNullable(dagManagementStateStore), 100000L, 
mock(DagProcessingEngineMetrics.class));
     dagProcessingEngine.startAsync();
+    this.mockedDagProcEngineMetrics = mock(DagProcessingEngineMetrics.class);
   }
 
   @AfterClass(alwaysRun = true)
@@ -129,7 +132,7 @@ public class DagProcessingEngineTest {
     private final boolean isBad;
 
     public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
-      super(dagAction, leaseObtainedStatus, 
DagProcessingEngineTest.dagManagementStateStore);
+      super(dagAction, leaseObtainedStatus, 
DagProcessingEngineTest.dagManagementStateStore, mockedDagProcEngineMetrics);
       this.isBad = isBad;
     }
 
@@ -158,7 +161,8 @@ public class DagProcessingEngineTest {
     }
 
     @Override
-    protected void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
+    protected void act(DagManagementStateStore dagManagementStateStore, Void 
state,
+        DagProcessingEngineMetrics dagProcEngineMetrics) {
       if (this.isBad) {
         throw new RuntimeException("Simulating an exception!");
       }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 6137f91d10..692876b1e6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashSet;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -66,7 +68,7 @@ public class MysqlDagActionStoreTest {
         .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
         .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
         .build();
-    return new MysqlDagActionStore(config);
+    return new MysqlDagActionStore(config, 
Mockito.mock(DagProcessingEngineMetrics.class));
   }
 
   @Test
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index ac4cd3c49f..2e64b71c40 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
@@ -57,10 +58,12 @@ import static org.mockito.Mockito.spy;
 public class EnforceDeadlineDagProcsTest {
   private ITestMetastoreDatabase testMetastoreDatabase;
   private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+  private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
 
   @BeforeClass
   public void setUp() throws Exception {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+    this.mockedDagProcEngineMetrics = 
Mockito.mock(DagProcessingEngineMetrics.class);
   }
 
   @AfterClass(alwaysRun = true)
@@ -103,8 +106,9 @@ public class EnforceDeadlineDagProcsTest {
 
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
         new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
-            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
-    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null,
+            dagManagementStateStore, mockedDagProcEngineMetrics));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
     Mockito.verify(specProducers.get(0), Mockito.times(1)).cancelJob(any(), 
any());
@@ -145,8 +149,9 @@ public class EnforceDeadlineDagProcsTest {
 
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
         new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
-            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
-    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null,
+            dagManagementStateStore, mockedDagProcEngineMetrics));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     // no job cancelled because we simulated (by not adding) missing dag action
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).cancelJob(any(), any()));
@@ -186,8 +191,9 @@ public class EnforceDeadlineDagProcsTest {
 
     EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new 
EnforceFlowFinishDeadlineDagProc(
         new EnforceFlowFinishDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
-            "job0", 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null, 
dagManagementStateStore));
-    enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
+            "job0", 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null,
+            dagManagementStateStore, mockedDagProcEngineMetrics));
+    enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.times(1)).cancelJob(any(), any()));
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index dbe17d3362..bc53139b31 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -63,12 +64,14 @@ import static org.mockito.Mockito.spy;
 public class KillDagProcTest {
   private MySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testDb;
+  private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
 
   @BeforeClass
   public void setUp() throws Exception {
     this.testDb = TestMetastoreDatabaseFactory.get();
     this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
     LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
+    this.mockedDagProcEngineMetrics = 
Mockito.mock(DagProcessingEngineMetrics.class);
   }
 
   @AfterClass(alwaysRun = true)
@@ -93,8 +96,8 @@ public class KillDagProcTest {
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow1",
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.LAUNCH),
-        null, this.dagManagementStateStore), flowCompilationValidationHelper);
-    launchDagProc.process(this.dagManagementStateStore);
+        null, this.dagManagementStateStore, mockedDagProcEngineMetrics), 
flowCompilationValidationHelper);
+    launchDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
       try {
@@ -106,8 +109,8 @@ public class KillDagProcTest {
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow1",
        flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.KILL),
-        null, this.dagManagementStateStore));
-    killDagProc.process(this.dagManagementStateStore);
+        null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+    killDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     long cancelJobCount = specProducers.stream()
         .mapToLong(p -> Mockito.mockingDetails(p)
@@ -139,8 +142,8 @@ public class KillDagProcTest {
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow2",
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.LAUNCH),
-        null, this.dagManagementStateStore), flowCompilationValidationHelper);
-    launchDagProc.process(this.dagManagementStateStore);
+        null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics), 
flowCompilationValidationHelper);
+    launchDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
       try {
@@ -152,8 +155,8 @@ public class KillDagProcTest {
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow2",
         flowExecutionId, "job2", DagActionStore.DagActionType.KILL),
-        null, this.dagManagementStateStore));
-    killDagProc.process(this.dagManagementStateStore);
+        null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics));
+    killDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     long cancelJobCount = specProducers.stream()
         .mapToLong(p -> Mockito.mockingDetails(p)
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 16c1bf8ffe..4027a11c7e 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.hadoop.fs.Path;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
@@ -70,6 +71,7 @@ import static org.mockito.Mockito.spy;
 public class LaunchDagProcTest {
   private ITestMetastoreDatabase testMetastoreDatabase;
   private MySqlDagManagementStateStore dagManagementStateStore;
+  private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -83,6 +85,7 @@ public class LaunchDagProcTest {
   public void resetDMSS() throws Exception {
     this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     mockDMSSCommonBehavior(this.dagManagementStateStore);
+    this.mockedDagProcEngineMetrics = 
Mockito.mock(DagProcessingEngineMetrics.class);
   }
 
   @AfterClass(alwaysRun = true)
@@ -107,10 +110,11 @@ public class LaunchDagProcTest {
     List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
     LaunchDagProc launchDagProc = new LaunchDagProc(
         new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, "job0",
-            DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore),
+            DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore,
+            this.mockedDagProcEngineMetrics),
         flowCompilationValidationHelper);
 
-    launchDagProc.process(this.dagManagementStateStore);
+    launchDagProc.process(this.dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int numOfLaunchedJobs = 1; // = number of start nodes
     Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
@@ -137,10 +141,11 @@ public class LaunchDagProcTest {
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
     LaunchDagProc launchDagProc = new LaunchDagProc(
         new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId,
-            "jn", DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore),
+            "jn", DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore,
+            this.mockedDagProcEngineMetrics),
         flowCompilationValidationHelper);
 
-    launchDagProc.process(this.dagManagementStateStore);
+    launchDagProc.process(this.dagManagementStateStore, 
mockedDagProcEngineMetrics);
     int numOfLaunchedJobs = 3; // = number of start nodes
     // parallel jobs are launched through reevaluate dag action
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 93f31d1410..9673381b26 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -65,6 +66,7 @@ public class ReevaluateDagProcTest {
   private ITestMetastoreDatabase testMetastoreDatabase;
   private DagManagementStateStore dagManagementStateStore;
   private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
+  private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
 
   @BeforeClass
   public void setUpClass() throws Exception {
@@ -76,6 +78,7 @@ public class ReevaluateDagProcTest {
   public void setUp() throws Exception {
     this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+    this.mockedDagProcEngineMetrics = 
Mockito.mock(DagProcessingEngineMetrics.class);
   }
 
   @AfterClass(alwaysRun = true)
@@ -114,8 +117,9 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
-    reEvaluateDagProc.process(dagManagementStateStore);
+        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
+        dagManagementStateStore, mockedDagProcEngineMetrics));
+    reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
     // next job is sent to spec producer
     Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
     // there are two invocations, one after setting status and other after 
sending new job to specProducer
@@ -163,8 +167,9 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
-    reEvaluateDagProc.process(dagManagementStateStore);
+        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
+        dagManagementStateStore, mockedDagProcEngineMetrics));
+    reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     // no new job to launch for this one job flow
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
@@ -199,8 +204,8 @@ public class ReevaluateDagProcTest {
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
-        dagManagementStateStore));
-    reEvaluateDagProc.process(dagManagementStateStore);
+        dagManagementStateStore, mockedDagProcEngineMetrics));
+    reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int numOfLaunchedJobs = 1; // only the current job
     // only the current job should have run
@@ -243,10 +248,11 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
+        flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE), 
null,
+        dagManagementStateStore, mockedDagProcEngineMetrics));
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     // process 4th job
-    reEvaluateDagProc.process(dagManagementStateStore);
+    reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th 
job passes, i.e. 5th and 6th job
     // parallel jobs are launched through reevaluate dag action
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index e342b29e77..4de37fad54 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.net.URISyntaxException;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -50,12 +51,14 @@ import static org.mockito.Mockito.spy;
 public class ResumeDagProcTest {
   private MySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testDb;
+  private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
 
   @BeforeClass
   public void setUp() throws Exception {
     testDb = TestMetastoreDatabaseFactory.get();
     this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
     LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
+    this.mockedDagProcEngineMetrics = 
Mockito.mock(DagProcessingEngineMetrics.class);
   }
 
   @AfterClass(alwaysRun = true)
@@ -91,8 +94,8 @@ public class ResumeDagProcTest {
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.RESUME),
-        null, this.dagManagementStateStore));
-    resumeDagProc.process(this.dagManagementStateStore);
+        null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+    resumeDagProc.process(this.dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int expectedNumOfResumedJobs = 1; // = number of resumed nodes
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
index 0a057b7457..3965f26dd5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
@@ -65,7 +65,8 @@ public class LaunchDagTaskTest {
    */
   @Test
   public void concludeRemovesAdhocFlowSpec() throws IOException {
-    LaunchDagTask dagTask = new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
+    LaunchDagTask dagTask = new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore,
+        Mockito.mock(DagProcessingEngineMetrics.class));
     dagTask.conclude();
     Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(any());
     Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).removeFlowSpec(any(), any(), anyBoolean());

Reply via email to