This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2837df2d5 [GOBBLIN-2069] implement dag procs to enforce job start 
deadline and dag completion deadline (#3950)
2837df2d5 is described below

commit 2837df2d53d860baf461bfb3d542d704663079a6
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed May 22 17:52:46 2024 -0700

    [GOBBLIN-2069] implement dag procs to enforce job start deadline and dag 
completion deadline (#3950)
    
    * add EnforceStartDeadlineDagProc and EnforceFinishDeadlineDagProc dag procs
    * fix tests
    * address review comments
    * add a todo as a workaround for a bug
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +-
 .../src/main/avro/DagActionStoreChangeEvent.avsc   |   4 +-
 gobblin-modules/gobblin-kafka-09/build.gradle      |   2 +-
 .../runtime/KafkaAvroJobStatusMonitorTest.java     |   9 ++
 .../service/monitoring/FlowStatusGenerator.java    |   3 +-
 gobblin-service/build.gradle                       |   2 +-
 .../modules/core/GobblinServiceGuiceModule.java    |  16 ++-
 .../modules/core/GobblinServiceManager.java        |  24 ++--
 .../orchestration/DagActionReminderScheduler.java  |  14 +-
 .../modules/orchestration/DagActionStore.java      |  22 +++-
 .../orchestration/DagManagementTaskStreamImpl.java |  73 +++++++++--
 .../service/modules/orchestration/DagManager.java  |   5 +-
 .../modules/orchestration/DagManagerUtils.java     |  19 +--
 .../modules/orchestration/DagProcFactory.java      |  15 +++
 .../modules/orchestration/DagProcessingEngine.java |  14 +-
 .../modules/orchestration/DagTaskVisitor.java      |   6 +-
 .../modules/orchestration/FlowLaunchHandler.java   |   9 +-
 .../MysqlMultiActiveLeaseArbiter.java              |  10 +-
 .../modules/orchestration/proc/DagProcUtils.java   |  75 +++++++++++
 .../proc/EnforceFlowFinishDeadlineDagProc.java     |  88 +++++++++++++
 .../proc/EnforceJobStartDeadlineDagProc.java       | 104 +++++++++++++++
 .../modules/orchestration/proc/KillDagProc.java    |  52 +-------
 .../orchestration/proc/ReevaluateDagProc.java      |  26 +++-
 .../task/EnforceFlowFinishDeadlineDagTask.java     |  39 ++++++
 .../task/EnforceJobStartDeadlineDagTask.java       |  39 ++++++
 .../service/modules/spec/JobExecutionPlan.java     |  28 ++++
 .../DagManagementDagActionStoreChangeMonitor.java  |   3 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  52 +++++++-
 .../gobblin/service/GobblinServiceManagerTest.java |  12 +-
 .../DagManagementTaskStreamImplTest.java           |   2 +-
 .../orchestration/DagProcessingEngineTest.java     |   5 +-
 .../proc/EnforceDeadlineDagProcsTest.java          | 144 +++++++++++++++++++++
 .../orchestration/proc/ReevaluateDagProcTest.java  |  25 ++++
 .../service/monitoring/JobStatusRetrieverTest.java |   2 +-
 .../monitoring/MysqlJobStatusRetrieverTest.java    |   4 +-
 gradle/scripts/dependencyDefinitions.gradle        |   1 +
 36 files changed, 815 insertions(+), 135 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 93acece84..309a451f1 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1072,7 +1072,7 @@ public class ConfigurationKeys {
   public static final String GOBBLIN_JOB_START_SLA_TIME = 
"gobblin.job.start.sla.time";
   public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"gobblin.job.start.sla.timeunit";
   public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
-  public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"MINUTES";
+  public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
   public static final String DATASET_SUBPATHS_KEY = 
"gobblin.flow.dataset.subPaths";
   public static final String DATASET_BASE_INPUT_PATH_KEY = 
"gobblin.flow.dataset.baseInputPath";
   public static final String DATASET_BASE_OUTPUT_PATH_KEY = 
"gobblin.flow.dataset.baseOutputPath";
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index c6d94a628..84ca4cbc5 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -34,9 +34,11 @@
       "type": "enum",
       "name": "DagActionValue",
       "symbols": [
+        "ENFORCE_FLOW_FINISH_DEADLINE",
+        "ENFORCE_JOB_START_DEADLINE",
         "KILL",
-        "RESUME",
         "LAUNCH",
+        "RESUME",
         "REEVALUATE"
       ],
       "symbolDocs": {
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle 
b/gobblin-modules/gobblin-kafka-09/build.gradle
index 9dd1604d9..58cfb3bbb 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -56,7 +56,7 @@ dependencies {
   testCompile project(path: ":gobblin-metastore", configuration: 
"testFixtures")
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.jsonAssert
-  testCompile externalDependency.mockito
+  testCompile externalDependency.mockitoInline
   testCompile externalDependency.testng
   testCompile(externalDependency.kafka09Test){
     exclude group: "com.sun.jmx", module: "jmxri"
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 647b5cfe1..1d1520d5d 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -71,6 +73,9 @@ import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueReposi
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -99,6 +104,7 @@ public class KafkaAvroJobStatusMonitorTest {
   private MetricContext context;
   private KafkaAvroEventKeyValueReporter.Builder<?> builder;
   private MysqlDagActionStore mysqlDagActionStore;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -118,6 +124,8 @@ public class KafkaAvroJobStatusMonitorTest {
     builder = 
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
         TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
     this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
   }
 
   @Test
@@ -762,6 +770,7 @@ public class KafkaAvroJobStatusMonitorTest {
   public void tearDown() {
     try {
       this.kafkaTestHelper.close();
+      this.mockedGobblinServiceManager.close();
     } catch(Exception e) {
       System.err.println("Failed to close Kafka server.");
     }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 4755f8368..4307659b4 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -40,7 +40,8 @@ import org.apache.gobblin.service.ExecutionStatus;
  */
 @Slf4j
 public class FlowStatusGenerator {
-  public static final List<String> FINISHED_STATUSES = 
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+  public static final List<String> FINISHED_STATUSES = 
Lists.newArrayList(ExecutionStatus.FAILED.name(),
+      ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
   public static final int MAX_LOOKBACK = 100;
 
   private final JobStatusRetriever jobStatusRetriever;
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index af029ad5c..80b2d7e39 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -90,7 +90,7 @@ dependencies {
   testRuntime externalDependency.derby
   testCompile externalDependency.hamcrest
   testCompile externalDependency.jhyde
-  testCompile externalDependency.mockito
+  testCompile externalDependency.mockitoInline
   testCompile externalDependency.testContainers
   testCompile externalDependency.testContainersMysql
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index b4d58e35e..2f7066f90 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -39,12 +39,7 @@ import javax.inject.Singleton;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
-import 
org.apache.gobblin.service.modules.orchestration.DagActionProcessingMultiActiveLeaseArbiterFactory;
-import 
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import 
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
-import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
@@ -70,7 +65,9 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionProcessingMultiActiveLeaseArbiterFactory;
 import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
@@ -78,10 +75,14 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
 import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
 import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
+import 
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
@@ -323,6 +324,9 @@ public class GobblinServiceGuiceModule implements Module {
 
     binder.bind(D2Announcer.class).to(NoopD2Announcer.class);
 
+    
binder.bindConstant().annotatedWith(Names.named(DagProcessingEngine.DEFAULT_JOB_START_DEADLINE_TIME_MS)).to(
+        
DagProcUtils.getDefaultJobStartDeadline(serviceConfig.getInnerConfig()));
+
     LOGGER.info("Bindings configured");
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 0b8a09843..827ac961e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -117,7 +117,6 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   public static final String SERVICE_EVENT_BUS_NAME = 
"GobblinServiceManagerEventBus";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinServiceManager.class);
-  @Setter private static volatile GobblinServiceGuiceModule 
GOBBLIN_SERVICE_GUICE_MODULE;
 
   protected final ServiceBasedAppLauncher serviceLauncher;
   private volatile boolean stopInProgress = false;
@@ -168,6 +167,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   @Inject
   @Getter
   private Injector injector;
+  @Getter @Setter private volatile static Injector staticInjector;
 
   protected boolean flowCatalogLocalCommit;
 
@@ -257,27 +257,27 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
   /**
    * Uses the provided serviceConfiguration to create a new Guice module and 
obtain a new class associated with it.
-   * This method should only be called once per application.
    */
   public static GobblinServiceManager create(GobblinServiceConfiguration 
serviceConfiguration) {
-    GOBBLIN_SERVICE_GUICE_MODULE = new 
GobblinServiceGuiceModule(serviceConfiguration);
-    return getClass(GobblinServiceManager.class);
+    if (GobblinServiceManager.staticInjector == null) {
+        GobblinServiceManager.staticInjector = 
Guice.createInjector(Stage.DEVELOPMENT, new 
GobblinServiceGuiceModule(serviceConfiguration));
+    }
+    return getClass(GobblinServiceManager.staticInjector, 
GobblinServiceManager.class);
   }
 
   /**
-   * If {@link GobblinServiceManager} is created using guice, user should set 
{@link GobblinServiceManager#GOBBLIN_SERVICE_GUICE_MODULE}
-   * for this method to work.
+   * This method assumes that {@link GobblinServiceManager} is created using 
guice, which sets the injector.
+   * If it is created through other ways, caller should use {@link 
GobblinServiceManager#create(GobblinServiceConfiguration)}
+   * or provide the {@link Injector} using {@link 
GobblinServiceManager#getClass(Injector, Class)} method instead.
    * @param classToGet
    * @return a new object if the class type is not marked with @Singleton, 
otherwise the same instance of the class
    * @param <T>
    */
   public static <T> T getClass(Class<T> classToGet) {
-    if (GOBBLIN_SERVICE_GUICE_MODULE == null) {
-      throw new RuntimeException(String.format("getClass called to obtain %s 
without calling create method to "
-          + "initialize GobblinServiceGuiceModule.", classToGet));
-    }
-    // Use development stage to enable more verbose error messages and runtime 
checks
-    Injector injector = Guice.createInjector(Stage.DEVELOPMENT, 
GOBBLIN_SERVICE_GUICE_MODULE);
+    return getClass(getStaticInjector(), classToGet);
+  }
+
+  public static <T> T getClass(Injector injector, Class<T> classToGet) {
     return injector.getInstance(classToGet);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 909d19f37..588db58d6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -33,6 +33,7 @@ import org.quartz.TriggerBuilder;
 import org.quartz.impl.StdSchedulerFactory;
 
 import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -44,6 +45,7 @@ import 
org.apache.gobblin.service.modules.core.GobblinServiceManager;
  * {#scheduleReminderJob} on a flow action that it failed to acquire a lease 
on but has not yet completed. The reminder
  * will fire once the previous lease owner's lease is expected to expire.
  */
+@Singleton
 public class DagActionReminderScheduler {
   public static final String DAG_ACTION_REMINDER_SCHEDULER_KEY = 
"DagActionReminderScheduler";
   private final Scheduler quartzScheduler;
@@ -86,18 +88,17 @@ public class DagActionReminderScheduler {
     @Override
     public void execute(JobExecutionContext context) {
       // Get properties from the trigger to create a dagAction
-      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      JobDataMap jobDataMap = context.getMergedJobDataMap();
       String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
       String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      String flowExecutionId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
 
       log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
-          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+          + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName 
+ ", dagActionType: " + dagActionType + ")");
 
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
-          dagActionType);
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType, true);
 
       try {
         DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
@@ -141,10 +142,9 @@ public class DagActionReminderScheduler {
    */
   public static Trigger createReminderJobTrigger(DagActionStore.DagAction 
dagAction, long reminderDurationMillis,
       Supplier<Long> getCurrentTimeMillis) {
-    Trigger trigger = TriggerBuilder.newTrigger()
+    return TriggerBuilder.newTrigger()
         .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowGroup())
         .startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
         .build();
-    return trigger;
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index a44f7b422..cdaf56c8a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
 import java.util.Collection;
 
 import lombok.Data;
+import lombok.RequiredArgsConstructor;
 
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
@@ -30,21 +31,29 @@ import 
org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 public interface DagActionStore {
   public static final String NO_JOB_NAME_DEFAULT = "";
   enum DagActionType {
+    CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
+    ENFORCE_JOB_START_DEADLINE, // Enforce job start deadline
+    ENFORCE_FLOW_FINISH_DEADLINE, // Enforce flow finish deadline
     KILL, // Kill invoked through API call
-    RESUME, // Resume flow invoked through API call
     LAUNCH, // Launch new flow execution invoked adhoc or through scheduled 
trigger
+    REEVALUATE, // Re-evaluate what needs to be done upon receipt of a final 
job status
+    RESUME, // Resume flow invoked through API call
     RETRY, // Invoked through DagManager for flows configured to allow retries
-    CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
-    REEVALUATE // Re-evaluate what needs to be done upon receipt of a final 
job status
   }
 
   @Data
+  @RequiredArgsConstructor
   class DagAction {
     final String flowGroup;
     final String flowName;
     final String flowExecutionId;
     final String jobName;
     final DagActionType dagActionType;
+    final boolean isReminder;
+
+    public DagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType) {
+      this(flowGroup, flowName, flowExecutionId, jobName, dagActionType, 
false);
+    }
 
     public static DagAction forFlow(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) {
       return new DagAction(flowGroup, flowName, flowExecutionId, 
NO_JOB_NAME_DEFAULT, dagActionType);
@@ -69,6 +78,13 @@ public interface DagActionStore {
       return new DagNodeId(this.flowGroup, this.flowName,
           Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
     }
+
+    /**
+     * Creates and returns a {@link DagManager.DagId} for this DagAction.
+     */
+    public DagManager.DagId getDagId() {
+      return new DagManager.DagId(this.flowGroup, this.flowName, 
this.flowExecutionId);
+    }
   }
 
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index e211906a6..1d32cde39 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -27,6 +27,7 @@ import org.quartz.SchedulerException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
 import com.typesafe.config.ConfigFactory;
 
 import javax.inject.Named;
@@ -39,11 +40,15 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -77,15 +82,16 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
   protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
   private final boolean isMultiActiveExecutionEnabled;
-  @Inject
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
   private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private final DagManagementStateStore dagManagementStateStore;
 
   @Inject
   public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore,
       @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME) 
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
       Optional<DagActionReminderScheduler> dagActionReminderScheduler,
-      @Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean 
isMultiActiveExecutionEnabled) {
+      @Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean 
isMultiActiveExecutionEnabled,
+      DagManagementStateStore dagManagementStateStore) {
     this.config = config;
     if (!dagActionStore.isPresent()) {
       /* DagActionStore is optional because there are other configurations 
that do not require it and it's initialized
@@ -102,12 +108,13 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
     MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+    this.dagManagementStateStore = dagManagementStateStore;
   }
 
   @Override
   public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
     // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add dagAction{}", dagAction);
+    log.info("Add dagAction {}", dagAction);
 
     if (!this.dagActionQueue.offer(dagAction)) {
       throw new RuntimeException("Could not add dag action " + dagAction + " 
to the queue");
@@ -124,9 +131,22 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       while (true) {
         try {
           DagActionStore.DagAction dagAction = this.dagActionQueue.take();
-          LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagAction);
-          if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
-            return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+          /* Create triggers for original (non-reminder) dag actions of type 
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
+             Reminder triggers are used to inform hosts once the job start 
deadline and flow finish deadline are passed;
+             then only is lease arbitration done to enforce the deadline 
violation and fail the job or flow if needed */
+          if (!dagAction.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+            createJobStartDeadlineTrigger(dagAction);
+          } else if (!dagAction.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+            createFlowFinishDeadlineTrigger(dagAction);
+          } else if (!dagAction.isReminder
+              || dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE
+              || dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+            // todo - fix bug of a reminder event getting a lease even when 
the first attempt succeeded.
+            // for now, avoid processing reminder events if they are not for 
deadline dag actions
+            LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagAction);
+            if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
+              return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+            }
           }
         } catch (Exception e) {
           //TODO: need to handle exceptions gracefully
@@ -135,6 +155,37 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       }
   }
 
+  private void createJobStartDeadlineTrigger(DagActionStore.DagAction 
dagAction) throws SchedulerException, IOException {
+    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
+        dagAction.getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    // todo - this timestamp is just an approximation, the real job submission 
has happened in past, and that is when a
+    // ENFORCE_JOB_START_DEADLINE dag action was created; we are just 
processing that dag action here
+    long jobSubmissionTime = System.currentTimeMillis();
+    long reminderDuration = jobSubmissionTime + timeOutForJobStart - 
System.currentTimeMillis();
+
+    dagActionReminderScheduler.get().scheduleReminder(dagAction, 
reminderDuration);
+  }
+
+  private void createFlowFinishDeadlineTrigger(DagActionStore.DagAction 
dagAction) throws SchedulerException, IOException {
+    long timeOutForJobFinish;
+    Dag.DagNode<JobExecutionPlan> dagNode = 
this.dagManagementStateStore.getDag(dagAction.getDagId()).get().getNodes().get(0);
+
+    try {
+      timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
+    } catch (ConfigException e) {
+      log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid 
format, using default SLA of {}",
+          
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+          
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+          DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+      timeOutForJobFinish = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+    }
+
+    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+    long reminderDuration = flowStartTime + timeOutForJobFinish - 
System.currentTimeMillis();
+
+    dagActionReminderScheduler.get().scheduleReminder(dagAction, 
reminderDuration);
+  }
+
   /**
    * Returns a {@link LeaseAttemptStatus} associated with the
    * `dagAction` by calling
@@ -148,7 +199,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       throws IOException, SchedulerException {
     // TODO: need to handle reminder events and flag them
     LeaseAttemptStatus leaseAttemptStatus = 
this.dagActionProcessingLeaseArbiter
-        .tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);
+        .tryAcquireLease(dagAction, System.currentTimeMillis(), 
dagAction.isReminder, false);
         /* Schedule a reminder for the event unless the lease has been 
completed to safeguard against the case where even
         we, when we might become the lease owner still fail to complete 
processing
         */
@@ -162,12 +213,16 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     DagActionStore.DagActionType dagActionType = dagAction.getDagActionType();
 
     switch (dagActionType) {
+      case ENFORCE_FLOW_FINISH_DEADLINE:
+        return new EnforceFlowFinishDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagActionStore.get());
+      case ENFORCE_JOB_START_DEADLINE:
+        return new EnforceJobStartDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagActionStore.get());
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
       case LAUNCH:
         return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
       case REEVALUATE:
         return new ReevaluateDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
-      case KILL:
-        return new KillDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
       case RESUME:
         return new ResumeDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
       default:
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ce942cd7a..51a846cb8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -138,8 +138,9 @@ public class DagManager extends AbstractIdleService {
   public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
   public static final String DAG_MANAGER_HEARTBEAT = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"dagManager.heartbeat-%s";
   // Default job start SLA time if configured, measured in minutes. Default is 
10 minutes
-  private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
-  private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+  // todo - rename "sla" -> "deadline", and move them to DagProcUtils
+  public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+  public static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
   private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index c4b62b5ec..e2fc6a179 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -96,7 +96,7 @@ public class DagManagerUtils {
     return getFlowExecId(dagNode.getValue().getJobSpec());
   }
 
-  static long getFlowExecId(JobSpec jobSpec) {
+  public static long getFlowExecId(JobSpec jobSpec) {
     return 
jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
   }
 
@@ -287,15 +287,8 @@ public class DagManagerUtils {
     
dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration()
 + 1);
   }
 
-  /**
-   * Flow start time is the same as the flow execution id which is the 
timestamp flow request was received, unless it
-   * is a resumed flow, in which case it is {@link 
JobExecutionPlan#getFlowStartTime()}
-   * @param dagNode dag node in context
-   * @return flow start time
-   */
-  static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
-    long flowStartTime = dagNode.getValue().getFlowStartTime();
-    return flowStartTime == 0L ? getFlowExecId(dagNode) : flowStartTime;
+  public static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
+    return dagNode.getValue().getFlowStartTime();
   }
 
   /**
@@ -304,7 +297,7 @@ public class DagManagerUtils {
    * @param dagNode dag node for which sla is to be retrieved
    * @return sla if it is provided, DEFAULT_FLOW_SLA_MILLIS otherwise
    */
-  static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
+  public static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
         jobConfig, ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, 
ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT));
@@ -320,7 +313,7 @@ public class DagManagerUtils {
    * @param dagNode dag node for which flow start sla is to be retrieved
    * @return job start sla in ms
    */
-  static long getJobStartSla(DagNode<JobExecutionPlan> dagNode, Long 
defaultJobStartSla) {
+  public static long getJobStartSla(DagNode<JobExecutionPlan> dagNode, Long 
defaultJobStartSla) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
         jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
@@ -331,6 +324,8 @@ public class DagManagerUtils {
         : defaultJobStartSla;
   }
 
+
+
   static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
     return getDagQueueId(DagManagerUtils.getFlowExecId(dag), numThreads);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 0cc559541..51f3c6ad2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -22,11 +22,15 @@ import com.google.inject.Singleton;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import 
org.apache.gobblin.service.modules.orchestration.proc.EnforceFlowFinishDeadlineDagProc;
+import 
org.apache.gobblin.service.modules.orchestration.proc.EnforceJobStartDeadlineDagProc;
 import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
 import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
 import org.apache.gobblin.service.modules.orchestration.proc.ReevaluateDagProc;
 import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -51,6 +55,17 @@ public class DagProcFactory implements 
DagTaskVisitor<DagProc> {
     this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
+  @Override
+  public DagProc meet(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
+    return new 
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask);
+  }
+
+  @Override
+  public DagProc meet(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask);
+  }
+
+
   @Override
   public LaunchDagProc meet(LaunchDagTask launchDagTask) {
     return new LaunchDagProc(launchDagTask, 
this.flowCompilationValidationHelper);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 464abdea1..611628e3f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.google.inject.name.Named;
 import com.typesafe.config.Config;
 
 import lombok.AllArgsConstructor;
@@ -61,10 +62,12 @@ public class DagProcessingEngine extends 
AbstractIdleService {
   private final Optional<DagProcFactory> dagProcFactory;
   private ScheduledExecutorService scheduledExecutorPool;
   private static final Integer TERMINATION_TIMEOUT = 30;
+  public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = 
"defaultJobStartDeadlineTimeMillis";
+  @Getter static long defaultJobStartSlaTimeMillis;
 
   @Inject
-  public DagProcessingEngine(Config config, Optional<DagTaskStream> 
dagTaskStream,
-      Optional<DagProcFactory> dagProcFactory, 
Optional<DagManagementStateStore> dagManagementStateStore) {
+  public DagProcessingEngine(Config config, Optional<DagTaskStream> 
dagTaskStream, Optional<DagProcFactory> dagProcFactory,
+      Optional<DagManagementStateStore> dagManagementStateStore, 
@Named(DEFAULT_JOB_START_DEADLINE_TIME_MS) long deadlineTimeMs) {
     this.config = config;
     this.dagProcFactory = dagProcFactory;
     this.dagTaskStream = dagTaskStream;
@@ -77,6 +80,11 @@ public class DagProcessingEngine extends AbstractIdleService 
{
           this.dagManagementStateStore.isPresent() ? "present" : "MISSING"));
     }
     log.info("DagProcessingEngine initialized.");
+    setDefaultJobStartDeadlineTimeMs(deadlineTimeMs);
+  }
+
+  private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) {
+    defaultJobStartSlaTimeMillis = deadlineTimeMs;
   }
 
   @Override
@@ -113,8 +121,8 @@ public class DagProcessingEngine extends 
AbstractIdleService {
 
     @Override
     public void run() {
+      log.info("Starting DagProcEngineThread to process dag tasks. Thread id: 
{}", threadID);
       while (true) {
-        log.info("Starting DagProcEngineThread to process dag tasks. Thread 
id: {}", threadID);
         DagTask dagTask = dagTaskStream.next(); // blocking call
         if (dagTask == null) {
           //todo - add a metrics to count the times dagTask was null
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
index 9c36c8c11..0d91b8977 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -24,8 +26,10 @@ import 
org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
 
 
 public interface DagTaskVisitor<T> {
+  T meet(EnforceFlowFinishDeadlineDagTask enforceFlowFinishDeadlineDagTask);
+  T meet(EnforceJobStartDeadlineDagTask enforceJobStartDeadlineDagTask);
+  T meet(KillDagTask killDagTask);
   T meet(LaunchDagTask launchDagTask);
   T meet(ReevaluateDagTask reevaluateDagTask);
-  T meet(KillDagTask killDagTask);
   T meet(ResumeDagTask resumeDagTask);
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 1c37a404c..bb4eda416 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -110,7 +111,7 @@ public class FlowLaunchHandler {
     LeaseAttemptStatus leaseAttempt = 
this.multiActiveLeaseArbiter.tryAcquireLease(
         dagAction, eventTimeMillis, isReminderEvent, 
adoptConsensusFlowExecutionId);
     if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
-        && persistDagAction((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt)) {
+        && persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt)) {
       log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", 
leaseAttempt.getConsensusDagAction(),
           ((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt).getEventTimeMillis());
     } else { // when NOT successfully `persistDagAction`, set a reminder to 
re-attempt handling (unless leasing finished)
@@ -136,9 +137,11 @@ public class FlowLaunchHandler {
    * Called after obtaining a lease to both persist to the {@link 
DagActionStore} and
    * {@link 
MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
    */
-  private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus 
leaseStatus) {
+  private boolean 
persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
+    DagActionStore.DagAction launchDagAction = 
leaseStatus.getConsensusDagAction();
     try {
-      this.dagActionStore.addDagAction(leaseStatus.getConsensusDagAction());
+      this.dagActionStore.addDagAction(launchDagAction);
+      DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(launchDagAction);
       this.numFlowsSubmitted.mark();
       // after successfully persisting, close the lease
       return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 2702a81d5..7543e20e3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -89,11 +89,11 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   private final int lingerMillis;
   private final long retentionPeriodMillis;
   private String thisTableRetentionStatement;
-  private String thisTableGetInfoStatement;
-  private String thisTableGetInfoStatementForReminder;
-  private String thisTableSelectAfterInsertStatement;
-  private String thisTableAcquireLeaseIfMatchingAllStatement;
-  private String thisTableAcquireLeaseIfFinishedStatement;
+  private final String thisTableGetInfoStatement;
+  private final String thisTableGetInfoStatementForReminder;
+  private final String thisTableSelectAfterInsertStatement;
+  private final String thisTableAcquireLeaseIfMatchingAllStatement;
+  private final String thisTableAcquireLeaseIfFinishedStatement;
 
   /*
     Notes:
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 9c87bb029..5396b0502 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -19,25 +19,35 @@ package 
org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
 
 
 /**
@@ -45,6 +55,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
  */
 @Slf4j
 public class DagProcUtils {
+  private static final DagActionStore dagActionStore = 
GobblinServiceManager.getClass(DagActionStore.class);
 
   /**
    * - submits a {@link JobSpec} to a {@link SpecExecutor}
@@ -82,6 +93,9 @@ public class DagProcUtils {
       // either successfully or unsuccessfully. To catch any exceptions in the 
job submission, the DagManagerThread
       // blocks (by calling Future#get()) until the submission is completed.
       dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+
+      sendEnforceJobStartDeadlineDagAction(dagNode);
+
       Future<?> addSpecFuture = producer.addSpec(jobSpec);
       // todo - we should add future.get() instead of the complete future into 
the JobExecutionPlan
       
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
@@ -111,4 +125,65 @@ public class DagProcUtils {
       throw new RuntimeException(e);
     }
   }
+
+  public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
+    Properties props = new Properties();
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+    if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+          
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    }
+
+    try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+        sendCancellationEvent(dagNodeToCancel.getValue());
+      } else {
+        log.warn("No Job future when canceling DAG node (hence, not sending 
cancellation event) - {}",
+            dagNodeToCancel.getValue().getJobSpec().getUri());
+      }
+      
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();
+      // todo - why was it not being cleaned up in DagManager?
+      dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static void cancelDag(Dag<JobExecutionPlan> dag, 
DagManagementStateStore dagManagementStateStore) throws IOException {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
+    log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
+
+    for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+      DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+    }
+  }
+
+  public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+    
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+    jobExecutionPlan.setExecutionStatus(CANCELLED);
+  }
+
+  private static void 
sendEnforceJobStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)
+      throws IOException {
+    dagActionStore.addJobDagAction(dagNode.getValue().getFlowGroup(), 
dagNode.getValue().getFlowName(),
+        String.valueOf(dagNode.getValue().getFlowExecutionId()), 
dagNode.getValue().getJobName(),
+        DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+  }
+
+  public static void 
sendEnforceFlowFinishDeadlineDagAction(DagActionStore.DagAction launchDagAction)
+      throws IOException {
+    dagActionStore.addFlowDagAction(launchDagAction.getFlowGroup(), 
launchDagAction.getFlowName(),
+        launchDagAction.getFlowExecutionId(), 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+  }
+
+  public static long getDefaultJobStartDeadline(Config config) {
+    TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
+        config, DagManager.JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+    return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, 
DagManager.JOB_START_SLA_TIME,
+        ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
new file mode 100644
index 000000000..3b1ac2886
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag 
does not finish in
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
+    super(enforceFlowFinishDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceFlowFinishDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceFlowFinishDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+    long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
+    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+    // note that this condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (System.currentTimeMillis() > flowStartTime + flowFinishDeadline) {
+      List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = 
dag.get().getNodes();
+      log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), getDagId());
+
+      for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+        DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+      }
+
+      
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+      dag.get().setMessage("Flow killed due to exceeding SLA of " + 
flowFinishDeadline + " ms");
+      dagManagementStateStore.checkpointDag(dag.get());
+    } else {
+      log.error("EnforceFlowFinishDeadline dagAction received before due time. 
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
new file mode 100644
index 000000000..8cf01be6b
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    super(enforceJobStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceJobStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store
+      // todo - add metrics
+      log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc", 
getDagNodeId());
+      return;
+    }
+
+    Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus = 
dagNodeToCheckDeadline.getRight();
+    if (!jobStatus.isPresent()) {
+      log.error("Some job status should be present for dag node {} that this 
EnforceJobStartDeadlineDagProc belongs.", getDagNodeId());
+      return;
+    }
+
+    ExecutionStatus executionStatus = valueOf(jobStatus.get().getEventName());
+    long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
+    // note that second condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() > 
jobOrchestratedTime + timeOutForJobStart) {
+      log.info("Job exceeded the job start deadline. Killing it now. Job - {}, 
jobOrchestratedTime - {}, timeOutForJobStart - {}",
+          DagManagerUtils.getJobName(dagNode), jobOrchestratedTime, 
timeOutForJobStart);
+      
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
+      DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
+      
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+      dag.get().setMessage("Flow killed because no update received for " + 
timeOutForJobStart + " ms after orchestration");
+      dagManagementStateStore.checkpointDag(dag.get());
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index 5f1c7bb97..15448d62b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -18,28 +18,17 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.Maps;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
-import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
-
 
 /**
  * An implementation for {@link DagProc} that kills all the nodes of a dag.
@@ -73,55 +62,18 @@ public class KillDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
 
     dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
     dag.get().setMessage("Flow killed by request");
-
     dagManagementStateStore.checkpointDag(dag.get());
 
     if (this.shouldKillSpecificJob) {
       Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = 
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
       if (dagNodeToCancel.isPresent()) {
-        cancelDagNode(dagNodeToCancel.get());
+        DagProcUtils.cancelDagNode(dagNodeToCancel.get(), 
dagManagementStateStore);
       } else {
         // todo - add a metric here
         log.error("Did not find Dag node with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.", getDagNodeId());
       }
     } else {
-      List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = 
dag.get().getNodes();
-      log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), getDagId());
-
-      for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
-        cancelDagNode(dagNodeToCancel);
-        // todo - why was it not being cleaned up in DagManager?
-        dagManagementStateStore.deleteDagNodeState(getDagId(), 
dagNodeToCancel);
-      }
-    }
-  }
-
-  private void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) 
throws IOException {
-    Properties props = new Properties();
-    if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
-      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
-          
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+      DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
     }
-
-    try {
-      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
-        Future future = dagNodeToCancel.getValue().getJobFuture().get();
-        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
-        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
-        sendCancellationEvent(dagNodeToCancel.getValue());
-      } else {
-        log.warn("No Job future when canceling DAG node (hence, not sending 
cancellation event) - {}",
-            dagNodeToCancel.getValue().getJobSpec().getUri());
-      }
-      
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
-    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-    
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
-    jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 6189a9b6e..10529b25e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -23,13 +23,17 @@ import java.util.Set;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.quartz.SchedulerException;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -39,7 +43,6 @@ import org.apache.gobblin.service.monitoring.JobStatus;
 
 
 /**
- * suggest:
  * A {@link DagProc} to launch any subsequent (dependent) job(s) once all 
pre-requisite job(s) in the Dag have succeeded.
  * When there are no more jobs to run and no more running, it cleans up the 
Dag.
  * (In future), if there are multiple new jobs to be launched, separate launch 
dag actions are created for each of them.
@@ -76,8 +79,8 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
   }
 
   @Override
-  protected void act(DagManagementStateStore dagManagementStateStore, 
Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus)
-      throws IOException {
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
+      Optional<JobStatus>> dagNodeWithJobStatus) throws IOException {
     if (!dagNodeWithJobStatus.getLeft().isPresent()) {
       // one of the reason this could arise is when the MALA leasing doesn't 
work cleanly and another DagProc::process
       // has cleaned up the Dag, yet did not complete the lease before this 
current one acquired its own
@@ -115,6 +118,8 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
       } else {
         dagManagementStateStore.markDagFailed(dag);
       }
+
+      removeFlowFinishDeadlineTriggerAndDagAction();
     }
   }
 
@@ -191,4 +196,19 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
   private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
     throw new UnsupportedOperationException("More than one start job is not 
allowed");
   }
+
+  private void removeFlowFinishDeadlineTriggerAndDagAction() {
+    DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
+        getDagNodeId().getFlowName(), 
String.valueOf(getDagNodeId().getFlowExecutionId()),
+        DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+    log.info("Deleting reminder trigger and dag action {}", 
enforceFlowFinishDeadlineDagAction);
+    // todo - add metrics
+
+    try {
+      
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(getDagTask().getDagAction());
+      
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceFlowFinishDeadlineDagAction);
+    } catch (SchedulerException | IOException e) {
+      log.warn("Failed to unschedule the reminder for {}", 
enforceFlowFinishDeadlineDagAction);
+    }
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
new file mode 100644
index 000000000..5eaf4ffa7
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not finished in
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+
+public class EnforceFlowFinishDeadlineDagTask extends DagTask {
+  public EnforceFlowFinishDeadlineDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+      DagActionStore dagActionStore) {
+    super(dagAction, leaseObtainedStatus, dagActionStore);
+  }
+
+  public <T> T host(DagTaskVisitor<T> visitor) {
+    return visitor.meet(this);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
new file mode 100644
index 000000000..7ad3867a5
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}.
+ */
+
+public class EnforceJobStartDeadlineDagTask extends DagTask {
+  public EnforceJobStartDeadlineDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+      DagActionStore dagActionStore) {
+    super(dagAction, leaseObtainedStatus, dagActionStore);
+  }
+
+  public <T> T host(DagTaskVisitor<T> visitor) {
+    return visitor.meet(this);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 97ca4fc45..7210c6725 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -239,6 +239,34 @@ public class JobExecutionPlan {
     this.id = DagManagerUtils.calcJobId(jobSpec.getConfig());
   }
 
+  /**
+   * Flow start time is the same as the flow execution id if it is not set, 
which is also the timestamp flow request was
+   * received. It is set to a non-zero number when it is a resumed flow.
+   */
+  public long getFlowStartTime() {
+    return this.flowStartTime == 0L ? 
DagManagerUtils.getFlowExecId(this.getJobSpec()) : flowStartTime;
+  }
+
+  public String getFlowGroup() {
+    return jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY);
+  }
+
+  public String getFlowName() {
+    return jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY);
+  }
+
+  public long getFlowExecutionId() {
+    return 
jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+  }
+
+  public String getJobGroup() {
+    return jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY);
+  }
+
+  public String getJobName() {
+    return jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
+  }
+
   /**
    * Render the JobSpec into a JSON string.
    * @return a valid JSON string representation of the JobSpec.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index c2065b7d5..eacecab94 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -58,8 +58,9 @@ public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChan
         dagAction.getDagActionType(), dagAction);
     LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ? 
ON_STARTUP : POST_STARTUP;
     try {
-      // todo - add actions for other other type of dag actions
       switch (dagAction.getDagActionType()) {
+        case ENFORCE_FLOW_FINISH_DEADLINE:
+        case ENFORCE_JOB_START_DEADLINE:
         case KILL :
         case LAUNCH :
         case REEVALUATE :
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c72502340..9d707ffdc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.quartz.SchedulerException;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
@@ -63,6 +64,8 @@ import 
org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
@@ -151,6 +154,12 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         this.eventProducer = observabilityEventProducer;
   }
 
+  public enum NewState {
+    FINISHED,
+    RUNNING,
+    SAME_AS_PREVIOUS,
+  }
+
   @Override
   protected void startUp() {
     super.startUp();
@@ -203,7 +212,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         }
 
         try (Timer.Context context = 
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-          Pair<org.apache.gobblin.configuration.State, Boolean> 
updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
+          Pair<org.apache.gobblin.configuration.State, NewState> 
updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
           jobStatus = updatedJobStatus.getLeft();
 
           String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
@@ -214,13 +223,14 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
           String storeName = jobStatusStoreName(flowGroup, flowName);
           String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
 
-          if (updatedJobStatus.getRight()) {
+          if (updatedJobStatus.getRight() == NewState.FINISHED) {
             this.eventProducer.emitObservabilityEvent(jobStatus);
-
             if (this.dagProcEngineEnabled) {
               // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
               this.dagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
             }
+          } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
+            removeStartDeadlineTriggerAndDagAction(flowGroup, flowName, 
flowExecutionId, jobName);
           }
 
           // update the state store after adding a dag action to guaranty 
at-least-once adding of dag action
@@ -246,6 +256,20 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     }
   }
 
+  private void removeStartDeadlineTriggerAndDagAction(String flowGroup, String 
flowName, String flowExecutionId, String jobName) {
+    DagActionStore.DagAction enforceStartDeadlineDagAction = new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), jobName, 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+    log.info("Deleting reminder trigger and dag action {}", 
enforceStartDeadlineDagAction);
+    // todo - add metrics
+
+    try {
+      
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
+      
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceStartDeadlineDagAction);
+    } catch (SchedulerException | IOException e) {
+      log.error("Failed to unschedule the reminder for {}", 
enforceStartDeadlineDagAction);
+    }
+  }
+
   /**
    * It fills missing fields in job status and also merge the fields with the 
existing job status in the state store.
    * Merging is required because we do not want to lose the information sent 
by other GobblinTrackingEvents.
@@ -254,7 +278,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
    * @throws IOException
    */
   @VisibleForTesting
-  static Pair<org.apache.gobblin.configuration.State, Boolean> 
recalcJobStatus(org.apache.gobblin.configuration.State jobStatus,
+  static Pair<org.apache.gobblin.configuration.State, NewState> 
recalcJobStatus(org.apache.gobblin.configuration.State jobStatus,
       StateStore<org.apache.gobblin.configuration.State> stateStore) throws 
IOException {
     try {
       if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
@@ -304,7 +328,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       }
 
       modifyStateIfRetryRequired(jobStatus);
-      return ImmutablePair.of(jobStatus, 
isNewStateTransitionToFinal(jobStatus, states));
+      return ImmutablePair.of(jobStatus, newState(jobStatus, states));
     } catch (Exception e) {
       log.warn("Meet exception when adding jobStatus to state store at "
           + e.getStackTrace()[0].getClassName() + "line number: " + 
e.getStackTrace()[0].getLineNumber(), e);
@@ -312,6 +336,16 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     }
   }
 
+  private static NewState newState(org.apache.gobblin.configuration.State 
jobStatus, List<org.apache.gobblin.configuration.State> states) {
+    if (isNewStateTransitionToFinal(jobStatus, states)) {
+      return NewState.FINISHED;
+    } else if (isNewStateTransitionToRunning(jobStatus, states)) {
+      return NewState.RUNNING;
+    } else {
+      return NewState.SAME_AS_PREVIOUS;
+    }
+  }
+
   private static boolean isFlowStatusAndPendingResume(String jobName, String 
jobGroup, String currentStatus) {
     return jobName != null && jobGroup != null && 
jobName.equals(JobStatusRetriever.NA_KEY) && 
jobGroup.equals(JobStatusRetriever.NA_KEY)
         && currentStatus.equals(ExecutionStatus.PENDING_RESUME.name());
@@ -340,6 +374,14 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         && 
!FlowStatusGenerator.FINISHED_STATUSES.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
   }
 
+  static boolean 
isNewStateTransitionToRunning(org.apache.gobblin.configuration.State 
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    if (prevStates.isEmpty()) {
+      return 
ExecutionStatus.RUNNING.name().equals(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+    }
+    return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) && 
ExecutionStatus.RUNNING.name().equals(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+        && 
!ExecutionStatus.RUNNING.name().equals(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+  }
+
   /**
    * Merge states based on precedence defined by {@link 
#ORDERED_EXECUTION_STATUSES}.
    * The state instance in the 1st argument reflects the more recent state of 
a job
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 670e40f80..b9861ede3 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -47,6 +47,9 @@ import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Stage;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
 import com.linkedin.restli.client.RestLiResponseException;
@@ -58,6 +61,8 @@ import 
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.core.GobblinServiceConfiguration;
+import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
@@ -218,8 +223,11 @@ public class GobblinServiceManagerTest {
 
   public static GobblinServiceManager 
createTestGobblinServiceManager(Properties serviceCoreProperties,
       String serviceName, String serviceId, String serviceWorkDir) {
-    GobblinServiceManager gobblinServiceManager = 
GobblinServiceManager.create(serviceName, serviceId,
-        ConfigUtils.propertiesToConfig(serviceCoreProperties), new 
Path(serviceWorkDir));
+    Injector testInjector = Guice.createInjector(Stage.DEVELOPMENT, new 
GobblinServiceGuiceModule(
+        new GobblinServiceConfiguration(serviceName, serviceId, 
ConfigUtils.propertiesToConfig(serviceCoreProperties),
+            new Path(serviceWorkDir))));
+    GobblinServiceManager gobblinServiceManager = 
GobblinServiceManager.getClass(testInjector, GobblinServiceManager.class);
+    gobblinServiceManager.setStaticInjector(testInjector);
 
     DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
     doNothing().when(spiedDagManager).setActive(anyBoolean());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index f8c0f8296..070d6a85b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -79,7 +79,7 @@ public class DagManagementTaskStreamImplTest {
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
             mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
-            false);
+            false, mock(DagManagementStateStore.class));
     this.dagProcFactory = new DagProcFactory(null);
     this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
         this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore, 0);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 9718222a1..d1f1e736d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -91,7 +91,8 @@ public class DagProcessingEngineTest {
     this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
-            mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)), false);
+            mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)), false,
+            this.dagManagementStateStore);
     this.dagProcFactory = new DagProcFactory(null);
 
     DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
@@ -100,7 +101,7 @@ public class DagProcessingEngineTest {
     this.dagTaskStream = spy(new MockedDagTaskStream());
     DagProcessingEngine dagProcessingEngine =
         new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream), 
Optional.ofNullable(this.dagProcFactory),
-            Optional.ofNullable(dagManagementStateStore));
+            Optional.ofNullable(dagManagementStateStore), 100000L);
     dagProcessingEngine.startAsync();
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
new file mode 100644
index 000000000..4f8f7b365
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  /*
+    This test simulate submitting a dag with a very short job start deadline 
that will definitely be breached,
+    resulting in the job requiring to be killed
+   */
+  @Test
+  public void enforceJobStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
+        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+
+  /*
+   This test simulate submitting a dag with a very short flow finish deadline 
that will definitely be breached,
+    resulting in the dag requiring to be killed
+ */
+  @Test
+  public void enforceFlowFinishDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    int numOfDagNodes = 5;
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        numOfDagNodes, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
is in running state
+
+    EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new 
EnforceFlowFinishDeadlineDagProc(
+        new EnforceFlowFinishDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
+
+    Assert.assertEquals(numOfDagNodes,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index c9642d437..6a625e527 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -42,7 +43,9 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -66,16 +69,24 @@ public class ReevaluateDagProcTest {
 
   private ITestMetastoreDatabase testMetastoreDatabase;
   private DagManagementStateStore dagManagementStateStore;
+  private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
+  private DagActionStore dagActionStore;
+  private DagActionReminderScheduler dagActionReminderScheduler;
 
   @BeforeClass
   public void setUpClass() throws Exception {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+    this.mockedGobblinServiceManager = 
Mockito.mockStatic(GobblinServiceManager.class);
   }
 
   @BeforeMethod
   public void setUp() throws Exception {
     this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     mockDMSSCommonBehavior(dagManagementStateStore);
+    this.dagActionStore = mock(DagActionStore.class);
+    this.dagActionReminderScheduler = mock(DagActionReminderScheduler.class);
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(this.dagActionStore);
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(this.dagActionReminderScheduler);
   }
 
   private void mockDMSSCommonBehavior(DagManagementStateStore 
dagManagementStateStore) throws IOException, SpecNotFoundException {
@@ -88,6 +99,7 @@ public class ReevaluateDagProcTest {
 
   @AfterClass(alwaysRun = true)
   public void tearDownClass() throws Exception {
+    this.mockedGobblinServiceManager.close();
     // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
     this.testMetastoreDatabase.close();
   }
@@ -134,6 +146,13 @@ public class ReevaluateDagProcTest {
     // current job's state is deleted
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
+
+    // when there is no more job to run in re-evaluate dag proc, it deletes 
enforce_flow_finish_dag_action also
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
 
   // test when there does not exist a next job in the dag when the current 
job's reevaluate dag action is processed
@@ -185,5 +204,11 @@ public class ReevaluateDagProcTest {
     // dag is deleted because the only job in the dag is completed
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
         .filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
+
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
+
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index fb216910d..ca505f73e 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -97,7 +97,7 @@ public abstract class JobStatusRetrieverTest {
       properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME, 
String.valueOf(endTime));
     }
     State jobStatus = new State(properties);
-    Pair<State, Boolean> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
+    Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
     jobStatus = updatedJobStatus.getLeft();
     this.jobStatusRetriever.getStateStore().put(
         KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 5a595d652..0b05beba2 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -141,7 +141,7 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
jobGroup);
     State jobStatus = new State(properties);
 
-    Pair<State, Boolean> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
+    Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
     jobStatus = updatedJobStatus.getLeft();
     this.jobStatusRetriever.getStateStore().put(
         KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
@@ -172,7 +172,7 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     State jobStatus = new State(properties);
 
     try {
-      Pair<State, Boolean> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
+      Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
       jobStatus = updatedJobStatus.getLeft();
       this.jobStatusRetriever.getStateStore().put(
           KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index cecba15fd..d553c5edf 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -134,6 +134,7 @@ ext.externalDependency = [
     // dependency directly to mockito-inline
     // 
https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#unmockable
     "mockito": "org.mockito:mockito-core:4.11.0",
+    "mockitoInline": "org.mockito:mockito-inline:4.11.0",
     "salesforceWsc": "com.force.api:force-wsc:" + salesforceVersion,
     "salesforcePartner": "com.force.api:force-partner-api:" + 
salesforceVersion,
     "scala": "org.scala-lang:scala-library:2.11.8",

Reply via email to