This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5d5561c [GOBBLIN-2090] delete deadline triggers in all hosts (#3973)
ac5d5561c is described below

commit ac5d5561c507246f2820256d0ae2f1418ec761c9
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jun 17 11:58:07 2024 -0700

    [GOBBLIN-2090] delete deadline triggers in all hosts (#3973)
    
    * delete deadline triggers in all hosts
    * fix tests
    * add more tests
    * address review comment
---
 .../apache/gobblin/metastore/MysqlStateStore.java  |   6 +-
 .../runtime/DagActionStoreChangeMonitorTest.java   |   2 +-
 ...gManagementDagActionStoreChangeMonitorTest.java | 149 ++++++++++++++++
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 197 +++++++++++++++------
 .../spec_executorInstance/MockedSpecExecutor.java  |  21 ++-
 .../modules/orchestration/DagActionStore.java      |   2 -
 .../MostlyMySqlDagManagementStateStore.java        |  12 +-
 .../modules/orchestration/proc/DagProcUtils.java   |  35 +++-
 .../orchestration/proc/ReevaluateDagProc.java      |  28 +--
 .../modules/orchestration/proc/ResumeDagProc.java  |   1 +
 .../service/modules/spec/JobExecutionPlan.java     |   1 +
 .../service/monitoring/ChangeMonitorUtils.java     |   1 -
 .../monitoring/DagActionStoreChangeMonitor.java    |  64 ++++---
 .../DagManagementDagActionStoreChangeMonitor.java  |  44 ++++-
 ...nagementDagActionStoreChangeMonitorFactory.java |   9 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  28 +--
 .../DagManagementTaskStreamImplTest.java           |  17 +-
 .../orchestration/DagProcessingEngineTest.java     |  16 +-
 .../MostlyMySqlDagManagementStateStoreTest.java    |  52 ++----
 .../orchestration/MysqlDagStateStoreTest.java      |  10 +-
 .../modules/orchestration/OrchestratorTest.java    |  15 +-
 .../proc/EnforceDeadlineDagProcsTest.java          |   8 +-
 .../orchestration/proc/KillDagProcTest.java        |  16 +-
 .../orchestration/proc/LaunchDagProcTest.java      |  54 ++++--
 .../orchestration/proc/ReevaluateDagProcTest.java  | 107 +++++------
 .../orchestration/proc/ResumeDagProcTest.java      |  46 ++---
 26 files changed, 600 insertions(+), 341 deletions(-)

diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 36ff51a56..aa8d00f5e 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -283,11 +283,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
       queryStatement.setString(++index, tableName);
 
       try (ResultSet rs = queryStatement.executeQuery()) {
-        if (rs.next()) {
-          return true;
-        } else {
-          return false;
-        }
+        return rs.next();
       }
     } catch (SQLException e) {
       throw new IOException("Failure checking existence of storeName " + 
storeName + " tableName " + tableName, e);
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 2627f55a5..40c930ea4 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -264,7 +264,7 @@ public class DagActionStoreChangeMonitorTest {
    * Form a key for events using the flow identifiers
    * @return a key formed by adding an '_' delimiter between the flow 
identifiers
    */
-  private String getKeyForFlow(String flowGroup, String flowName, long 
flowExecutionId) {
+  public static String getKeyForFlow(String flowGroup, String flowName, long 
flowExecutionId) {
     return flowGroup + "_" + flowName + "_" + flowExecutionId;
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
new file mode 100644
index 000000000..b93d7147f
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.quartz.SchedulerException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import 
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link 
DagManagementDagActionStoreChangeMonitor} to process {@link 
DagActionStoreChangeEvent} type
+ * events stored in a {@link 
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to 
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitorTest {
+  public static final String TOPIC = 
DagActionStoreChangeEvent.class.getSimpleName();
+  private final int PARTITION = 1;
+  private final int OFFSET = 1;
+  private final String FLOW_GROUP = "flowGroup";
+  private final String FLOW_NAME = "flowName";
+  private final long FLOW_EXECUTION_ID = 123L;
+  private final String JOB_NAME = "jobName";
+  private MockDagManagementDagActionStoreChangeMonitor 
mockDagManagementDagActionStoreChangeMonitor;
+  private int txidCounter = 0;
+
+  private static final DagActionReminderScheduler dagActionReminderScheduler = 
mock(DagActionReminderScheduler.class);
+
+  /**
+   * Note: The class methods are wrapped in a test specific method because the 
original methods are package protected
+   * and cannot be accessed by this class.
+   */
+  static class MockDagManagementDagActionStoreChangeMonitor extends 
DagManagementDagActionStoreChangeMonitor {
+
+    public MockDagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads, boolean isMultiActiveSchedulerEnabled) {
+      super(config, numThreads, mock(FlowCatalog.class), 
mock(Orchestrator.class), mock(DagManagementStateStore.class),
+          isMultiActiveSchedulerEnabled, mock(DagManagement.class), 
dagActionReminderScheduler);
+    }
+    protected void processMessageForTest(DecodeableKafkaRecord<String, 
DagActionStoreChangeEvent> record) {
+      super.processMessage(record);
+    }
+  }
+
+  MockDagManagementDagActionStoreChangeMonitor 
createMockDagManagementDagActionStoreChangeMonitor() {
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    return new MockDagManagementDagActionStoreChangeMonitor(config, 5, true);
+  }
+
+  // Called at start of every test so the count of each method being called is 
reset to 0
+  @BeforeMethod
+  public void setupMockMonitor() {
+     mockDagManagementDagActionStoreChangeMonitor = 
createMockDagManagementDagActionStoreChangeMonitor();
+  }
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any());
+
+  }
+
+  /**
+   * Tests process message with a DELETE type message.
+   */
+  @Test
+  public void testProcessMessageWithDelete() throws SchedulerException {
+    Kafka09ConsumerClient.Kafka09ConsumerRecord<String, 
DagActionStoreChangeEvent> consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME, 
DagActionValue.ENFORCE_JOB_START_DEADLINE);
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
+        DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+    
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    
verify(mockDagManagementDagActionStoreChangeMonitor.getDagActionReminderScheduler(),
 times(1))
+        .unscheduleReminderJob(eq(dagAction));
+  }
+
+  /**
+   * Util to create a general DagActionStoreChange type event
+   */
+  private DagActionStoreChangeEvent 
createDagActionStoreChangeEvent(OperationType operationType,
+      String flowGroup, String flowName, long flowExecutionId, String jobName, 
DagActionValue dagAction) {
+    String key = DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup, 
flowName, flowExecutionId);
+    GenericStoreChangeEvent genericStoreChangeEvent =
+        new GenericStoreChangeEvent(key, String.valueOf(txidCounter), 
System.currentTimeMillis(), operationType);
+    txidCounter++;
+    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, String.valueOf(flowExecutionId),
+        jobName, dagAction);
+  }
+
+  /**
+   * Util to create wrapper around DagActionStoreChangeEvent
+   */
+  private Kafka09ConsumerClient.Kafka09ConsumerRecord<String, 
DagActionStoreChangeEvent> wrapDagActionStoreChangeEvent(
+      OperationType operationType, String flowGroup, String flowName, long 
flowExecutionId, String jobName, DagActionValue dagAction) {
+    DagActionStoreChangeEvent eventToProcess = null;
+    try {
+      eventToProcess =
+          createDagActionStoreChangeEvent(operationType, flowGroup, flowName, 
flowExecutionId, jobName, dagAction);
+    } catch (Exception e) {
+      log.error("Exception while creating event ", e);
+    }
+    // TODO: handle partition and offset values better
+    ConsumerRecord<String, DagActionStoreChangeEvent> consumerRecord = new 
ConsumerRecord<>(TOPIC, PARTITION, OFFSET,
+        DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup, flowName, 
flowExecutionId), eventToProcess);
+    return new Kafka09ConsumerClient.Kafka09ConsumerRecord<>(consumerRecord);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index b1496e5d4..f03a7ff4b 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -29,9 +29,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -72,9 +72,10 @@ import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueReposi
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
-import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -84,6 +85,9 @@ import 
org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProduc
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 
 
@@ -91,18 +95,17 @@ public class KafkaAvroJobStatusMonitorTest {
   public static final String TOPIC = 
KafkaAvroJobStatusMonitorTest.class.getSimpleName();
 
   private KafkaTestBase kafkaTestHelper;
-  private String flowGroup = "myFlowGroup";
-  private String flowName = "myFlowName";
-  private String jobGroup = "myJobGroup";
-  private String jobName = "myJobName";
-  private long flowExecutionId = 1234L;
-  private String jobExecutionId = "1111";
-  private String message = "https://myServer:8143/1234/1111";;
-  private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
+  private final String flowGroup = "myFlowGroup";
+  private final String flowName = "myFlowName";
+  private final String jobGroup = "myJobGroup";
+  private final String jobName = "myJobName";
+  private final long flowExecutionId = 1234L;
+  private final String jobExecutionId = "1111";
+  private final String message = "https://myServer:8143/1234/1111";;
+  private final String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
   private MetricContext context;
   private KafkaAvroEventKeyValueReporter.Builder<?> builder;
-  private DagManagementStateStore dagManagementStateStore;
-  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+  DagActionStore.DagAction enforceJobStartDeadlineDagAction;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -112,7 +115,7 @@ public class KafkaAvroJobStatusMonitorTest {
     kafkaTestHelper.provisionTopic(TOPIC);
 
     // Create KeyValueProducerPusher instance.
-    Pusher pusher = new KafkaKeyValueProducerPusher<byte[], 
byte[]>("localhost:dummy", TOPIC,
+    Pusher<Pair<byte[], byte[]>> pusher = new 
KafkaKeyValueProducerPusher<>("localhost:dummy", TOPIC,
         Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
this.kafkaTestHelper.getKafkaServerPort()))));
 
@@ -121,12 +124,14 @@ public class KafkaAvroJobStatusMonitorTest {
     builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
     builder = 
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
         TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
-    this.dagManagementStateStore = mock(DagManagementStateStore.class);
-    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
+
+    this.enforceJobStartDeadlineDagAction = new 
DagActionStore.DagAction(this.flowGroup, this.flowName, this.flowExecutionId, 
this.jobName,
+        DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
   }
 
   @Test
   public void testProcessMessageForSuccessfulFlow() throws IOException, 
ReflectiveOperationException {
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic1");
 
     //Submit GobblinTrackingEvents to Kafka
@@ -148,24 +153,36 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-      new NoopGaaSJobObservabilityEventProducer());
+      new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     // (per above, is a 'dummy' event)
     Assert.assertNull(jobStatusMonitor.parseJobStatus(
@@ -180,6 +197,7 @@ public class KafkaAvroJobStatusMonitorTest {
 
   @Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
   public void testProcessMessageForFailedFlow() throws IOException, 
ReflectiveOperationException {
+    DagManagementStateStore dagManagementStateStore = 
mock(MostlyMySqlDagManagementStateStore.class);
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic2");
 
     //Submit GobblinTrackingEvents to Kafka
@@ -206,7 +224,8 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+        ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), 
dagManagementStateStore);
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -215,15 +234,15 @@ public class KafkaAvroJobStatusMonitorTest {
     // Verify undecodeable message is skipped
     byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
         messageAndMetadata.message().length - 1);
-    ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC, 
messageAndMetadata.partition(),
+    ConsumerRecord<byte[], byte[]> undecodeableRecord = new 
ConsumerRecord<>(TOPIC, messageAndMetadata.partition(),
         messageAndMetadata.offset(), messageAndMetadata.key(), 
undecodeableMessage);
     Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
0L);
-    jobStatusMonitor.processMessage(new 
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+    jobStatusMonitor.processMessage(new 
Kafka09ConsumerClient.Kafka09ConsumerRecord<>(undecodeableRecord));
     Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
1L);
     // Re-test when properly encoded, as expected for a normal event
     
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
-    StateStore stateStore = jobStatusMonitor.getStateStore();
+    StateStore<State> stateStore = jobStatusMonitor.getStateStore();
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
     String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
     List<State> stateList  = stateStore.getAll(storeName, tableName);
@@ -231,40 +250,59 @@ public class KafkaAvroJobStatusMonitorTest {
     State state = stateList.get(0);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
 
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         iterator,
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
 
     //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Because the maximum attempt is set to 2, so the state is set to Failed 
after trying twice
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.FAILED.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     jobStatusMonitor.shutDown();
   }
 
   @Test (dependsOnMethods = "testProcessMessageForFailedFlow")
   public void testProcessMessageForSkippedFlow() throws IOException, 
ReflectiveOperationException {
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic2");
 
     //Submit GobblinTrackingEvents to Kafka
@@ -284,7 +322,7 @@ public class KafkaAvroJobStatusMonitorTest {
     }
 
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-        new NoopGaaSJobObservabilityEventProducer());
+        new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
     jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -293,15 +331,15 @@ public class KafkaAvroJobStatusMonitorTest {
     // Verify undecodeable message is skipped
     byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
         messageAndMetadata.message().length - 1);
-    ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC, 
messageAndMetadata.partition(),
+    ConsumerRecord<byte[], byte[]> undecodeableRecord = new 
ConsumerRecord<>(TOPIC, messageAndMetadata.partition(),
         messageAndMetadata.offset(), messageAndMetadata.key(), 
undecodeableMessage);
     Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
0L);
-    jobStatusMonitor.processMessage(new 
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+    jobStatusMonitor.processMessage(new 
Kafka09ConsumerClient.Kafka09ConsumerRecord<>(undecodeableRecord));
     Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
1L);
     // Re-test when properly encoded, as expected for a normal event
     
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
-    StateStore stateStore = jobStatusMonitor.getStateStore();
+    StateStore<State> stateStore = jobStatusMonitor.getStateStore();
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
     String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
     List<State> stateList  = stateStore.getAll(storeName, tableName);
@@ -309,20 +347,27 @@ public class KafkaAvroJobStatusMonitorTest {
     State state = stateList.get(0);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
 
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         iterator,
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
     jobStatusMonitor.shutDown();
   }
 
   @Test (dependsOnMethods = "testProcessMessageForSkippedFlow")
   public void testProcessingRetriedForApparentlyTransientErrors() throws 
IOException, ReflectiveOperationException {
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic3");
 
     //Submit GobblinTrackingEvents to Kafka
@@ -344,10 +389,10 @@ public class KafkaAvroJobStatusMonitorTest {
     Config conf = ConfigFactory.empty().withValue(
         KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + 
RETRY_MULTIPLIER, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle,
 conf,
-        new NoopGaaSJobObservabilityEventProducer());
+        new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
 
     jobStatusMonitor.buildMetricsContextAndMetrics();
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
@@ -373,12 +418,17 @@ public class KafkaAvroJobStatusMonitorTest {
             jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus()));
 
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+
     toggleManagementExecutor.shutdownNow();
     jobStatusMonitor.shutDown();
   }
 
   @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
   public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
 
     //Submit GobblinTrackingEvents to Kafka
@@ -402,43 +452,66 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+        ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), 
dagManagementStateStore);
     jobStatusMonitor.buildMetricsContextAndMetrics();
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     // Received kill flow event, should not retry the flow even though there 
is 1 pending attempt left
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     jobStatusMonitor.shutDown();
   }
 
   @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
   public void testProcessMessageForFlowPendingResume() throws IOException, 
ReflectiveOperationException {
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
 
     //Submit GobblinTrackingEvents to Kafka
@@ -461,34 +534,56 @@ public class KafkaAvroJobStatusMonitorTest {
       Thread.currentThread().interrupt();
     }
 
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+        ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), 
dagManagementStateStore);
     jobStatusMonitor.buildMetricsContextAndMetrics();
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     // Job for flow pending resume status after it was cancelled or failed
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RESUME.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).addJobDagAction(any(), any(),
+        anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     jobStatusMonitor.shutDown();
   }
@@ -512,9 +607,9 @@ public class KafkaAvroJobStatusMonitorTest {
     }
 
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-        new NoopGaaSJobObservabilityEventProducer());
+        new NoopGaaSJobObservabilityEventProducer(), 
mock(DagManagementStateStore.class));
     jobStatusMonitor.buildMetricsContextAndMetrics();
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
@@ -544,10 +639,10 @@ public class KafkaAvroJobStatusMonitorTest {
     MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
     MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
         issueRepository, false);
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-        mockEventProducer);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+        ConfigFactory.empty(), mockEventProducer, 
mock(DagManagementStateStore.class));
     jobStatusMonitor.buildMetricsContextAndMetrics();
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
@@ -591,10 +686,10 @@ public class KafkaAvroJobStatusMonitorTest {
     MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
     MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
         issueRepository, false);
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
-        mockEventProducer);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
+        ConfigFactory.empty(), mockEventProducer, 
mock(DagManagementStateStore.class));
     jobStatusMonitor.buildMetricsContextAndMetrics();
-    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
         this.kafkaTestHelper.getIteratorForTopic(TOPIC),
         this::convertMessageAndMetadataToDecodableKafkaRecord);
 
@@ -619,10 +714,10 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.shutDown();
   }
 
-  private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor 
jobStatusMonitor, Iterator<DecodeableKafkaRecord> recordIterator,
+  private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor 
jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>> 
recordIterator,
       String jobGroup, String jobName) throws IOException {
     jobStatusMonitor.processMessage(recordIterator.next());
-    StateStore stateStore = jobStatusMonitor.getStateStore();
+    StateStore<State> stateStore = jobStatusMonitor.getStateStore();
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
     String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, jobGroup, 
jobName);
     List<State> stateList  = stateStore.getAll(storeName, tableName);
@@ -719,13 +814,15 @@ public class KafkaAvroJobStatusMonitorTest {
   }
 
   MockKafkaAvroJobStatusMonitor 
createMockKafkaAvroJobStatusMonitor(AtomicBoolean 
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
-      GaaSJobObservabilityEventProducer eventProducer) throws IOException, 
ReflectiveOperationException {
+      GaaSJobObservabilityEventProducer eventProducer, DagManagementStateStore 
dagManagementStateStore) throws IOException, ReflectiveOperationException {
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
         
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
         .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"))
+        .withValue(ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, 
ConfigValueFactory.fromAnyRef("true"))
         .withFallback(additionalConfig);
-    return new MockKafkaAvroJobStatusMonitor("test",config, 1, 
shouldThrowFakeExceptionInParseJobStatusToggle, eventProducer);
+    return new MockKafkaAvroJobStatusMonitor("test", config, 1, 
shouldThrowFakeExceptionInParseJobStatusToggle,
+        eventProducer, dagManagementStateStore);
   }
   /**
    *   Create a dummy event to test if it is filtered out by the consumer.
@@ -767,13 +864,12 @@ public class KafkaAvroJobStatusMonitorTest {
   public void tearDown() {
     try {
       this.kafkaTestHelper.close();
-      this.mockedGobblinServiceManager.close();
     } catch(Exception e) {
       System.err.println("Failed to close Kafka server.");
     }
   }
 
-  class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
+  static class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor 
{
     private final AtomicBoolean shouldThrowFakeExceptionInParseJobStatus;
     @Getter
     private volatile int numFakeExceptionsFromParseJobStatus = 0;
@@ -782,14 +878,15 @@ public class KafkaAvroJobStatusMonitorTest {
      * @param shouldThrowFakeExceptionInParseJobStatusToggle - pass (and 
retain) to dial whether `parseJobStatus` throws
      */
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads,
-        AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSJobObservabilityEventProducer producer)
+        AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSJobObservabilityEventProducer producer,
+        DagManagementStateStore dagManagementStateStore)
         throws IOException, ReflectiveOperationException {
       super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer, dagManagementStateStore);
       shouldThrowFakeExceptionInParseJobStatus = 
shouldThrowFakeExceptionInParseJobStatusToggle;
     }
 
     @Override
-    protected void processMessage(DecodeableKafkaRecord record) {
+    protected void processMessage(DecodeableKafkaRecord<byte[], byte[]> 
record) {
       super.processMessage(record);
     }
 
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 77d21c14d..7647b60e2 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -24,23 +24,28 @@ import java.util.concurrent.Future;
 import org.mockito.Mockito;
 
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+
+import lombok.EqualsAndHashCode;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.when;
 
 
+@EqualsAndHashCode(of = "config", callSuper=false)
 public class MockedSpecExecutor extends InMemorySpecExecutor {
-  private SpecProducer<Spec> mockedSpecProducer;
+  private final SpecProducer<Spec> mockedSpecProducer;
+  private final Config config;
 
   public MockedSpecExecutor(Config config) {
     super(config);
+    this.config = config;
     this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
     when(mockedSpecProducer.addSpec(any())).thenReturn(new 
CompletedFuture(Boolean.TRUE, null));
     when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
@@ -49,9 +54,17 @@ public class MockedSpecExecutor extends InMemorySpecExecutor 
{
     }
 
   public static SpecExecutor createDummySpecExecutor(URI uri) {
+    return new 
MockedSpecExecutor(makeDummyConfigsForSpecExecutor(uri.toString()));
+  }
+
+  public static Config makeDummyConfigsForSpecExecutor(String specUriInString) 
{
+    String specStoreDir = "/tmp/specStoreDir";
     Properties properties = new Properties();
-    properties.setProperty(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
uri.toString());
-    return new MockedSpecExecutor(ConfigFactory.parseProperties(properties));
+    properties.put("specStore.fs.dir", specStoreDir);
+    properties.put("specExecInstance.capabilities", "source:destination");
+    properties.put(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
specUriInString);
+    properties.put("uri", specUriInString);
+    return ConfigUtils.propertiesToConfig(properties);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 3e0735bd3..2d22aa3a2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -31,14 +31,12 @@ import 
org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 public interface DagActionStore {
   public static final String NO_JOB_NAME_DEFAULT = "";
   enum DagActionType {
-    CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
     ENFORCE_JOB_START_DEADLINE, // Enforce job start deadline
     ENFORCE_FLOW_FINISH_DEADLINE, // Enforce flow finish deadline
     KILL, // Kill invoked through API call
     LAUNCH, // Launch new flow execution invoked adhoc or through scheduled 
trigger
     REEVALUATE, // Re-evaluate what needs to be done upon receipt of a final 
job status
     RESUME, // Resume flow invoked through API call
-    RETRY, // Invoked through DagManager for flows configured to allow retries
   }
 
   @Data
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 862c73f96..82504c729 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -74,7 +74,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   private final UserQuotaManager quotaManager;
   Map<URI, TopologySpec> topologySpecMap;
   private final Config config;
-  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  public static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
   public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
   FlowCatalog flowCatalog;
   @Getter
@@ -194,8 +194,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   @Override
   public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> 
dagNode, DagManager.DagId dagId)
       throws IOException {
-    Optional<Dag<JobExecutionPlan>> dag = getDag(dagId);
-    if (!dag.isPresent()) {
+    if (!containsDag(dagId)) {
       throw new RuntimeException("Dag " + dagId + " not found");
     }
     this.dagNodes.put(dagNode.getValue().getId(), dagNode);
@@ -264,6 +263,13 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
     }
   }
 
+  /* todo - this method works because when the jobs finish they are deleted 
from the DMSS -> if no more job is found, means
+   no more running jobs.
+   But DMSS still has dags and which still contains dag nodes. We need to 
revisit this method's logic when we change
+   DMSS to a fully mysql backed implementation. then we may want to consider 
this approach
+   return getDagNodes(dagId).stream()
+       .anyMatch(node -> 
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
+  */
   @Override
   public boolean hasRunningJobs(DagManager.DagId dagId) {
     return !getDagNodes(dagId).isEmpty();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index b52f361dc..b11bb4c1c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -45,6 +45,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
@@ -160,7 +161,7 @@ public class DagProcUtils {
 
     try {
       if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
-        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
         String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
         props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
         sendCancellationEvent(dagNodeToCancel.getValue());
@@ -210,4 +211,36 @@ public class DagProcUtils {
     return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, 
DagManager.JOB_START_SLA_TIME,
         ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
   }
+
+  public static boolean isJobLevelStatus(String jobName) {
+    return !jobName.equals(JobStatusRetriever.NA_KEY);
+  }
+
+  public static void 
removeEnforceJobStartDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, String flowGroup,
+      String flowName, long flowExecutionId, String jobName) {
+    DagActionStore.DagAction enforceJobStartDeadlineDagAction = new 
DagActionStore.DagAction(flowGroup, flowName,
+        flowExecutionId, jobName, 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+    log.info("Deleting dag action {}", enforceJobStartDeadlineDagAction);
+    // todo - add metrics
+
+    try {
+      
dagManagementStateStore.deleteDagAction(enforceJobStartDeadlineDagAction);
+    } catch (IOException e) {
+      log.warn("Failed to delete dag action {}", 
enforceJobStartDeadlineDagAction);
+    }
+  }
+
+  public static void removeFlowFinishDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, DagManager.DagId dagId) {
+    DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(dagId.getFlowGroup(),
+        dagId.getFlowName(), dagId.getFlowExecutionId(),
+        DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+    log.info("Deleting dag action {}", enforceFlowFinishDeadlineDagAction);
+    // todo - add metrics
+
+    try {
+      
dagManagementStateStore.deleteDagAction(enforceFlowFinishDeadlineDagAction);
+    } catch (IOException e) {
+      log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
+    }
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 242290656..a57de4818 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -21,17 +21,13 @@ import java.io.IOException;
 import java.util.Optional;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.quartz.SchedulerException;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
-import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -83,7 +79,8 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
     JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
     ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
-    setStatus(dagManagementStateStore, dagNode, executionStatus);
+    // pass dag, so that dag is updated too, updated information will be 
required in onJobFinish in finding next jobs to submit
+    setStatus(dagManagementStateStore, dag, getDagNodeId(), executionStatus);
 
     if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
       log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
@@ -120,7 +117,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
         dagManagementStateStore.markDagFailed(dag);
       }
 
-      removeFlowFinishDeadlineTriggerAndDagAction(dagManagementStateStore);
+      DagProcUtils.removeFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagId());
     }
   }
 
@@ -129,9 +126,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
    * todo - DMSS should support this functionality like an atomic get-and-set 
operation.
    */
   private void setStatus(DagManagementStateStore dagManagementStateStore,
-      Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus) 
throws IOException {
-    Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
-    DagNodeId dagNodeId = dagNode.getValue().getId();
+      Dag<JobExecutionPlan> dag, DagNodeId dagNodeId, ExecutionStatus 
executionStatus) throws IOException {
     for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
       if (node.getValue().getId().equals(dagNodeId)) {
         node.getValue().setExecutionStatus(executionStatus);
@@ -177,19 +172,4 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     dagManagementStateStore.checkpointDag(dag);
     dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
   }
-
-  private void 
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore) {
-    DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
-        getDagNodeId().getFlowName(), getDagNodeId().getFlowExecutionId(),
-        DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
-    log.info("Deleting reminder trigger and dag action {}", 
enforceFlowFinishDeadlineDagAction);
-    // todo - add metrics
-
-    try {
-      
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(getDagTask().getDagAction());
-      
dagManagementStateStore.deleteDagAction(enforceFlowFinishDeadlineDagAction);
-    } catch (SchedulerException | IOException e) {
-      log.warn("Failed to unschedule the reminder for {}", 
enforceFlowFinishDeadlineDagAction);
-    }
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index ddc100ae8..68e02494e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -88,6 +88,7 @@ public class ResumeDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
     // these two statements effectively move the dag from failed dag store to 
(running) dag store.
     // to prevent loss in the unlikely event of failure between the two, we 
add first.
     dagManagementStateStore.checkpointDag(failedDag.get());
+
     // if it fails here, it will check point the failed dag in the (running) 
dag store again, which is idempotent
     dagManagementStateStore.deleteFailedDag(failedDag.get());
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 72e545104..561ee2345 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -60,6 +60,7 @@ import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
  */
 @Data
 @EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", 
"jobFuture", "flowStartTime"})
+// todo - consider excluding SpecExecutor from EqualsAndHashCode or only 
including DagNodeId
 public class JobExecutionPlan {
   public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
   public static final String JOB_PROPS_KEY = "job.props";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
index 9e121c8b7..c74b3ffe0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -26,7 +26,6 @@ import org.apache.gobblin.metrics.ContextAwareMeter;
 @Slf4j
 public final class ChangeMonitorUtils {
   private ChangeMonitorUtils() {
-    return;
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 5706ad9cc..a179811a4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -59,7 +59,7 @@ import 
org.apache.gobblin.service.modules.orchestration.Orchestrator;
  * connector between the API and execution layers of GaaS.
  */
 @Slf4j
-public class DagActionStoreChangeMonitor extends HighLevelConsumer {
+public class DagActionStoreChangeMonitor extends HighLevelConsumer<String, 
DagActionStoreChangeEvent> {
   public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = 
"dagActionChangeStore";
 
   // Metrics
@@ -70,13 +70,13 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   private ContextAwareMeter successfulLaunchSubmissionsOnStartup;
   private ContextAwareMeter failedLaunchSubmissionsOnStartup;
   protected ContextAwareMeter unexpectedErrors;
-  private ContextAwareMeter messageProcessedMeter;
-  private ContextAwareMeter duplicateMessagesMeter;
-  private ContextAwareMeter heartbeatMessagesMeter;
-  private ContextAwareMeter nullDagActionTypeMessagesMeter;
+  protected ContextAwareMeter messageProcessedMeter;
+  protected ContextAwareMeter duplicateMessagesMeter;
+  protected ContextAwareMeter heartbeatMessagesMeter;
+  protected ContextAwareMeter nullDagActionTypeMessagesMeter;
   private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
 
-  private volatile Long produceToConsumeDelayValue = -1L;
+  protected volatile Long produceToConsumeDelayValue = -1L;
 
   protected LaunchSubmissionMetricProxy ON_STARTUP = new 
NullLaunchSubmissionMetricProxy();
   protected LaunchSubmissionMetricProxy POST_STARTUP = new 
NullLaunchSubmissionMetricProxy();
@@ -188,11 +188,11 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   This class is multithreaded and this method will be called by multiple 
threads, however any given message will be
   partitioned and processed by only one thread (and corresponding queue).
    */
-  protected void processMessage(DecodeableKafkaRecord message) {
+  protected void processMessage(DecodeableKafkaRecord<String, 
DagActionStoreChangeEvent> message) {
     // This will also include the heathCheck message so that we can rely on 
this to monitor the health of this Monitor
     messageProcessedMeter.mark();
-    String key = (String) message.getKey();
-    DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) 
message.getValue();
+    String key = message.getKey();
+    DagActionStoreChangeEvent value = message.getValue();
 
     String tid = value.getChangeEventIdentifier().getTxId();
     Long produceTimestamp = 
value.getChangeEventIdentifier().getProduceTimestampMillis();
@@ -224,31 +224,41 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
         dagActionType);
 
+    handleDagAction(operation, dagAction, flowGroup, flowName, 
flowExecutionId, dagActionType);
+
+    dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
+  }
+
+  protected void handleDagAction(String operation, DagActionStore.DagAction 
dagAction, String flowGroup, String flowName,
+      long flowExecutionId, DagActionStore.DagActionType dagActionType) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
-      if (operation.equals("INSERT")) {
-        handleDagAction(dagAction, false);
-      } else if (operation.equals("UPDATE")) {
-        // TODO: change this warning message and process updates if for launch 
or reevaluate type
-        log.warn("Received an UPDATE action to the DagActionStore when values 
in this store are never supposed to be "
-            + "updated. Flow group: {} name {} executionId {} were updated to 
action {}", flowGroup, flowName,
-            flowExecutionId, dagActionType);
-        this.unexpectedErrors.mark();
-      } else if (operation.equals("DELETE")) {
-        log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
-      } else {
-        log.warn("Received unsupported change type of operation {}. Expected 
values to be in [INSERT, UPDATE, DELETE]",
-            operation);
-        this.unexpectedErrors.mark();
-        return;
+      switch (operation) {
+        case "INSERT":
+          handleDagAction(dagAction, false);
+          break;
+        case "UPDATE":
+          // TODO: change this warning message and process updates if for 
launch or reevaluate type
+          log.warn("Received an UPDATE action to the DagActionStore when 
values in this store are never supposed to be "
+                  + "updated. Flow group: {} name {} executionId {} were 
updated to action {}", flowGroup, flowName,
+              flowExecutionId, dagActionType);
+          this.unexpectedErrors.mark();
+          break;
+        case "DELETE":
+          log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
+          break;
+        default:
+          log.warn(
+              "Received unsupported change type of operation {}. Expected 
values to be in [INSERT, UPDATE, DELETE]",
+              operation);
+          this.unexpectedErrors.mark();
+          break;
       }
     } catch (Exception e) {
-      log.warn("Ran into unexpected error processing DagActionStore changes: 
{}", e);
+      log.warn("Ran into unexpected error processing DagActionStore changes: 
", e);
       this.unexpectedErrors.mark();
     }
-
-    dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index ab863974c..a411ae1d0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -19,11 +19,14 @@ package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -37,16 +40,55 @@ import 
org.apache.gobblin.service.modules.orchestration.Orchestrator;
 @Slf4j
 public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
   private final DagManagement dagManagement;
+  @VisibleForTesting @Getter
+  private final DagActionReminderScheduler dagActionReminderScheduler;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
       FlowCatalog flowCatalog, Orchestrator orchestrator, 
DagManagementStateStore dagManagementStateStore,
-      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement, 
DagActionReminderScheduler dagActionReminderScheduler) {
     // DagManager is only needed in the `handleDagAction` method of its parent 
class and not needed in this class,
     // so we are passing a null value for DagManager to its parent class.
     super("", config, null, numThreads, flowCatalog, orchestrator, 
dagManagementStateStore, isMultiActiveSchedulerEnabled);
     this.dagManagement = dagManagement;
+    this.dagActionReminderScheduler = dagActionReminderScheduler;
+  }
+
+  @Override
+  protected void handleDagAction(String operation, DagActionStore.DagAction 
dagAction, String flowGroup, String flowName,
+      long flowExecutionId, DagActionStore.DagActionType dagActionType) {
+    // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
+    // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed.
+    try {
+      switch (operation) {
+        case "INSERT":
+          handleDagAction(dagAction, false);
+          break;
+        case "UPDATE":
+          log.warn("Received an UPDATE action to the DagActionStore when 
values in this store are never supposed to be "
+                  + "updated. Flow group: {} name {} executionId {} were 
updated to action {}", flowGroup, flowName,
+              flowExecutionId, dagActionType);
+          this.unexpectedErrors.mark();
+          break;
+        case "DELETE":
+          log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
+          if (dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE
+              || dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+            this.dagActionReminderScheduler.unscheduleReminderJob(dagAction);
+          }
+          break;
+        default:
+          log.warn(
+              "Received unsupported change type of operation {}. Expected 
values to be in [INSERT, UPDATE, DELETE]",
+              operation);
+          this.unexpectedErrors.mark();
+          break;
+      }
+    } catch (Exception e) {
+      log.warn("Ran into unexpected error processing DagActionStore changes: 
", e);
+      this.unexpectedErrors.mark();
+    }
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index c5828575d..a55286e17 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.util.InjectionNames;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -48,17 +49,20 @@ public class 
DagManagementDagActionStoreChangeMonitorFactory implements Provider
   private final DagManagementStateStore dagManagementStateStore;
   private final boolean isMultiActiveSchedulerEnabled;
   private final DagManagement dagManagement;
+  private final DagActionReminderScheduler dagActionReminderScheduler;
 
   @Inject
   public DagManagementDagActionStoreChangeMonitorFactory(Config config, 
DagManager dagManager, FlowCatalog flowCatalog,
       Orchestrator orchestrator, DagManagementStateStore 
dagManagementStateStore, DagManagement dagManagement,
-      @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
+      @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled,
+      DagActionReminderScheduler dagActionReminderScheduler) {
     this.config = Objects.requireNonNull(config);
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
     this.dagManagementStateStore = dagManagementStateStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
     this.dagManagement = dagManagement;
+    this.dagActionReminderScheduler = dagActionReminderScheduler;
   }
 
   private DagManagementDagActionStoreChangeMonitor 
createDagActionStoreMonitor() {
@@ -68,7 +72,8 @@ public class DagManagementDagActionStoreChangeMonitorFactory 
implements Provider
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
     return new 
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
-        numThreads, flowCatalog, orchestrator, dagManagementStateStore, 
isMultiActiveSchedulerEnabled, this.dagManagement);
+        numThreads, flowCatalog, orchestrator, dagManagementStateStore, 
isMultiActiveSchedulerEnabled, this.dagManagement,
+        this.dagActionReminderScheduler);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 7caec9a4c..9b7e40695 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
-import org.quartz.SchedulerException;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
@@ -64,10 +63,9 @@ import 
org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
-import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.retry.RetryerFactory;
@@ -223,17 +221,22 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
           String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
           String storeName = jobStatusStoreName(flowGroup, flowName);
           String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
+          String status = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED) {
             this.eventProducer.emitObservabilityEvent(jobStatus);
           }
 
-          if (this.dagProcEngineEnabled) {
+          if (this.dagProcEngineEnabled && 
DagProcUtils.isJobLevelStatus(jobName)) {
             if (updatedJobStatus.getRight() == NewState.FINISHED) {
               // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
               this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
             } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
-              removeStartDeadlineTriggerAndDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
+              
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
+            }
+            // in case, the job is cancelled before it started, we need to 
clean it's enforceJobStartDeadlineDagAction
+            if (status != null && 
ExecutionStatus.valueOf(status).equals(ExecutionStatus.CANCELLED)) {
+              
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
             }
           }
 
@@ -260,21 +263,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     }
   }
 
-  private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore, String flowGroup,
-      String flowName, long flowExecutionId, String jobName) {
-    DagActionStore.DagAction enforceStartDeadlineDagAction = new 
DagActionStore.DagAction(flowGroup, flowName,
-        flowExecutionId, jobName, 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
-    log.info("Deleting reminder trigger and dag action {}", 
enforceStartDeadlineDagAction);
-    // todo - add metrics
-
-    try {
-      
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
-      dagManagementStateStore.deleteDagAction(enforceStartDeadlineDagAction);
-    } catch (SchedulerException | IOException e) {
-      log.error("Failed to unschedule the reminder for {}", 
enforceStartDeadlineDagAction);
-    }
-  }
-
   /**
    * It fills missing fields in job status and also merge the fields with the 
existing job status in the state store.
    * Merging is required because we do not want to lose the information sent 
by other GobblinTrackingEvents.
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 205f02dbc..046366b24 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -18,9 +18,6 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 
 import org.junit.Assert;
@@ -34,7 +31,6 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -43,6 +39,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 
@@ -61,22 +58,14 @@ public class DagManagementTaskStreamImplTest {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
     ConfigBuilder configBuilder = ConfigBuilder.create();
-    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 this.testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
     Config config = configBuilder.build();
 
-    // Constructing TopologySpecMap.
-    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
-    String specExecInstance = "mySpecExecutor";
-    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
-    URI specExecURI = new URI(specExecInstance);
-    topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config,
-        null, null, null, mock(DagActionStore.class));
-    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
             mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index b3a924454..14dd338f6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -18,9 +18,6 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.TimeoutException;
@@ -39,7 +36,6 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
@@ -73,22 +69,14 @@ public class DagProcessingEngineTest {
 
     Config config;
     ConfigBuilder configBuilder = ConfigBuilder.create();
-    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
     config = configBuilder.build();
 
-    // Constructing TopologySpecMap.
-    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
-    String specExecInstance = "mySpecExecutor";
-    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
-    URI specExecURI = new URI(specExecInstance);
-    topologySpecMap.put(specExecURI, topologySpec);
-    dagManagementStateStore = new MostlyMySqlDagManagementStateStore(config, 
null,
-        null, null, dagActionStore);
-    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
     doReturn(true).when(dagActionStore).deleteDagAction(any());
     dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 970309cef..aaabf6a86 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -29,19 +29,16 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
-import com.zaxxer.hikari.HikariDataSource;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.MysqlStateStore;
-import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -62,7 +59,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
-  private static final String TEST_TABLE = "quotas";
+  private static final String TEST_TABLE = "table";
+  public static String TEST_SPEC_EXECUTOR_URI = "mySpecExecutor";
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -118,11 +116,11 @@ public class MostlyMySqlDagManagementStateStoreTest {
 
   public static MostlyMySqlDagManagementStateStore 
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
     ConfigBuilder configBuilder = ConfigBuilder.create();
-    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testMetastoreDatabase.getJdbcUrl())
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE + 
1)
+        
.addPrimitive(MostlyMySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
+            + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE + 
2);
     Config config = configBuilder.build();
     JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
     JobStatus dummyJobStatus = 
JobStatus.builder().flowName("fn").flowGroup("fg").jobGroup("fg").jobName("job0")
@@ -132,41 +130,13 @@ public class MostlyMySqlDagManagementStateStoreTest {
 
     // Constructing TopologySpecMap.
     Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
-    String specExecInstance = "mySpecExecutor";
-    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
-    URI specExecURI = new URI(specExecInstance);
+
+    TopologySpec topologySpec = 
LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
+    URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
     topologySpecMap.put(specExecURI, topologySpec);
     MostlyMySqlDagManagementStateStore dagManagementStateStore =
         new MostlyMySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever, mock(DagActionStore.class));
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     return dagManagementStateStore;
   }
-
-  /**
-   * Only overwrite {@link #createStateStore(Config)} method to directly 
return a mysqlStateStore
-   * backed by mocked db.
-   */
-  public static class TestMysqlDagStateStore extends MysqlDagStateStore {
-    public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
-      super(config, topologySpecMap);
-    }
-
-    @Override
-    protected StateStore<State> createStateStore(Config config) {
-      try {
-        String jdbcUrl = 
config.getString(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
-        HikariDataSource dataSource = new HikariDataSource();
-
-        
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
-        dataSource.setAutoCommit(false);
-        dataSource.setJdbcUrl(jdbcUrl);
-        dataSource.setUsername(TEST_USER);
-        dataSource.setPassword(TEST_PASSWORD);
-
-        return new MysqlStateStore<>(dataSource, TEST_DAG_STATE_STORE, false, 
State.class);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index 59816742a..41a092c58 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -42,6 +42,7 @@ import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -161,7 +162,7 @@ public class MysqlDagStateStoreTest {
    * Only overwrite {@link #createStateStore(Config)} method to directly 
return a mysqlStateStore
    * backed by mocked db.
    */
-  public class TestMysqlDagStateStore extends MysqlDagStateStore {
+  public static class TestMysqlDagStateStore extends MysqlDagStateStore {
     public TestMysqlDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
       super(config, topologySpecMap);
     }
@@ -170,7 +171,10 @@ public class MysqlDagStateStoreTest {
     protected StateStore<State> createStateStore(Config config) {
       try {
         // Setting up mock DB
-        String jdbcUrl = MysqlDagStateStoreTest.testDb.getJdbcUrl();
+        String jdbcUrl = 
config.hasPath(ConfigurationKeys.STATE_STORE_DB_URL_KEY)
+            ? config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY)
+            : MysqlDagStateStoreTest.testDb.getJdbcUrl();
+        String tableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_DAG_STATE_STORE);
         HikariDataSource dataSource = new HikariDataSource();
 
         
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
@@ -179,7 +183,7 @@ public class MysqlDagStateStoreTest {
         dataSource.setUsername(TEST_USER);
         dataSource.setPassword(TEST_PASSWORD);
 
-        return new MysqlStateStore<>(dataSource, TEST_DAG_STATE_STORE, false, 
State.class);
+        return new MysqlStateStore<>(dataSource, tableName, false, 
State.class);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index dc2a1b40f..a32a211fc 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -41,8 +41,8 @@ import com.google.common.base.Optional;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
-import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
@@ -67,6 +67,7 @@ import org.apache.gobblin.util.PathUtils;
 
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 
 
 public class OrchestratorTest {
@@ -130,20 +131,12 @@ public class OrchestratorTest {
     this.mockDagManager = mock(DagManager.class);
     Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
 
-    Config config = ConfigBuilder.create()
-        
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
-            
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 this.testMetastoreDatabase.getJdbcUrl())
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE).build();
-
     MostlyMySqlDagManagementStateStore dagManagementStateStore =
-        new MostlyMySqlDagManagementStateStore(config, null, null, null, 
mock(DagActionStore.class));
+        
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
-    FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, 
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+    FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(ConfigFactory.empty(), 
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), 
mockFlowStatusGenerator);
     this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockFlowStatusGenerator,
         Optional.absent(), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 0fb261515..ff05be247 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -86,7 +86,9 @@ public class EnforceDeadlineDagProcsTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
-            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
         message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
@@ -122,7 +124,9 @@ public class EnforceDeadlineDagProcsTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
-            .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+            .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
         message("Test 
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index 0976f6c59..64efb2ed5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -37,7 +37,6 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
@@ -56,7 +55,6 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.monitoring.JobStatus;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -70,9 +68,7 @@ public class KillDagProcTest {
   public void setUp() throws Exception {
     this.testDb = TestMetastoreDatabaseFactory.get();
     this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
-    
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
-    doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
-    doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), 
any());
+    LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
 
   @AfterClass(alwaysRun = true)
@@ -87,7 +83,10 @@ public class KillDagProcTest {
   public void killDag() throws IOException, URISyntaxException, 
InterruptedException {
     long flowExecutionId = System.currentTimeMillis();
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
-        5, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg")));
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg"))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
@@ -125,7 +124,10 @@ public class KillDagProcTest {
   public void killDagNode() throws IOException, URISyntaxException, 
InterruptedException {
     long flowExecutionId = System.currentTimeMillis();
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("2", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
-        5, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg")));
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg"))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     JobStatus
         jobStatus = 
JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId).
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 1e7580e7f..e57e09589 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -22,10 +22,9 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
+import org.apache.hadoop.fs.Path;
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -42,14 +41,16 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -75,9 +76,7 @@ public class LaunchDagProcTest {
   public void setUp() throws Exception {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
     this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
-    
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
-    doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
-    doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), 
any());
+    mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
 
   @AfterClass(alwaysRun = true)
@@ -87,17 +86,18 @@ public class LaunchDagProcTest {
   }
 
   @Test
-  public void launchDag() throws IOException, InterruptedException, 
URISyntaxException, ExecutionException {
+  public void launchDag() throws IOException, InterruptedException, 
URISyntaxException {
     String flowGroup = "fg";
     String flowName = "fn";
     long flowExecutionId = 12345L;
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
         DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5", 
ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
-            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName)));
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+            MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
-    SpecProducer<Spec> specProducer = 
DagManagerUtils.getSpecProducer(dag.getNodes().get(0));
     List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
     LaunchDagProc launchDagProc = new LaunchDagProc(
         new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, "job0",
@@ -107,15 +107,10 @@ public class LaunchDagProcTest {
     launchDagProc.process(this.dagManagementStateStore);
 
     int numOfLaunchedJobs = 1; // = number of start nodes
-    long addSpecCount = specProducers.stream()
-        .mapToLong(p -> Mockito.mockingDetails(p)
-            .getInvocations()
-            .stream()
-            .filter(a -> a.getMethod().getName().equals("addSpec"))
-            .count())
-        .sum();
-    Mockito.verify(specProducer, 
Mockito.times(numOfLaunchedJobs)).addSpec(any());
-    Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
+    Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+    specProducers.stream().skip(numOfLaunchedJobs) // separately verified 
`specProducers.get(0)`
+        .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
 
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
         .addFlowDagAction(any(), any(), anyLong(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
@@ -129,7 +124,9 @@ public class LaunchDagProcTest {
     Dag<JobExecutionPlan> dag = 
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
         DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5", 
ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
-            .withValue(ConfigurationKeys.FLOW_NAME_KEY,  
ConfigValueFactory.fromAnyRef(flowName)));
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY,  
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
     LaunchDagProc launchDagProc = new LaunchDagProc(
@@ -180,4 +177,23 @@ public class LaunchDagProcTest {
     }
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
+
+  public static void mockDMSSCommonBehavior(DagManagementStateStore 
dagManagementStateStore) throws IOException, SpecNotFoundException {
+    
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+  }
+
+  public static TopologySpec buildNaiveTopologySpec(String specUriInString) {
+    Config specExecConfig = 
MockedSpecExecutor.makeDummyConfigsForSpecExecutor(specUriInString);
+    SpecExecutor specExecutorInstanceProducer = new 
MockedSpecExecutor(specExecConfig);
+    TopologySpec.Builder topologySpecBuilder = TopologySpec
+        .builder(new 
Path(specExecConfig.getString("specStore.fs.dir")).toUri())
+        .withConfig(specExecConfig)
+        .withDescription("test")
+        .withVersion("1")
+        .withSpecExecutor(specExecutorInstanceProducer);
+
+    return topologySpecBuilder.build();
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 057788173..bbf5874db 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration.proc;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -38,14 +37,11 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
-import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -59,20 +55,16 @@ import org.apache.gobblin.service.monitoring.JobStatus;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 
 public class ReevaluateDagProcTest {
   private final long flowExecutionId = System.currentTimeMillis();
   private final String flowGroup = "fg";
-
   private ITestMetastoreDatabase testMetastoreDatabase;
   private DagManagementStateStore dagManagementStateStore;
   private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
-  private DagActionReminderScheduler dagActionReminderScheduler;
 
   @BeforeClass
   public void setUpClass() throws Exception {
@@ -83,17 +75,7 @@ public class ReevaluateDagProcTest {
   @BeforeMethod
   public void setUp() throws Exception {
     this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
-    mockDMSSCommonBehavior(dagManagementStateStore);
-    this.dagActionReminderScheduler = mock(DagActionReminderScheduler.class);
-    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(this.dagActionReminderScheduler);
-  }
-
-  private void mockDMSSCommonBehavior(DagManagementStateStore 
dagManagementStateStore) throws IOException, SpecNotFoundException {
-    
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
-    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
-    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
-    doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
-    doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+    LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
   }
 
   @AfterClass(alwaysRun = true)
@@ -106,45 +88,43 @@ public class ReevaluateDagProcTest {
   @Test
   public void testOneNextJobToRun() throws Exception {
     String flowName = "fn";
+    DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName, 
flowExecutionId);
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
         2, "user5", ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
-    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
-        message("Test 
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
+        .flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
+        
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    dagManagementStateStore.checkpointDag(dag);
 
-    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
-    doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    /*
+    We cannot check the spec producers if addSpec is called on them because 
spec producer object changes in writing/reading
+    from DMSS. So, in order to verify job submission by checking the mock 
invocations on spec producers, we will have to
+    fix the dag, which includes SpecProducer, by mocking, when we read a dag 
from DMSS
+    */
+    doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus)))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    doReturn(Optional.of(dag)). when(dagManagementStateStore).getDag(dagId);
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
 
-    long addSpecCount = specProducers.stream()
-        .mapToLong(p -> Mockito.mockingDetails(p)
-            .getInvocations()
-            .stream()
-            .filter(a -> a.getMethod().getName().equals("addSpec"))
-            .count())
-        .sum();
-
     // next job is sent to spec producer
-    Assert.assertEquals(addSpecCount, 1L);
+    Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
+    // there are two invocations, one after setting status and other after 
sending new job to specProducer
+    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(2)).addDagNodeState(any(), any());
 
     // current job's state is deleted
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
-
-    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
-        .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
-
-    // when there is no more job to run in re-evaluate dag proc, it deletes 
enforce_flow_finish_dag_action also
-    
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
-        .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
 
   // test when there does not exist a next job in the dag when the current 
job's reevaluate dag action is processed
@@ -156,31 +136,36 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
-    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
-        message("Test 
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
+        .flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
+        
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    dagManagementStateStore.checkpointDag(dag);
 
-    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
-    doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+    Dag<JobExecutionPlan> mockedDag = DagManagerTest.buildDag("2", 
flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+    // mock getDagNodeWithJobStatus() to return a dagNode with status completed
+    
mockedDag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+    doReturn(new ImmutablePair<>(Optional.of(mockedDag.getNodes().get(0)), 
Optional.of(jobStatus)))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
 
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
 
-    long addSpecCount = specProducers.stream()
-        .mapToLong(p -> Mockito.mockingDetails(p)
-            .getInvocations()
-            .stream()
-            .filter(a -> a.getMethod().getName().equals("addSpec"))
-            .count())
-        .sum();
-
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
 
     // no new job to launch for this one job flow
-    Assert.assertEquals(addSpecCount, 0L);
+    specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
 
     // current job's state is deleted
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
@@ -190,9 +175,6 @@ public class ReevaluateDagProcTest {
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
         .filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
 
-    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
-        .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
-
     
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
@@ -205,12 +187,12 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
-    List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
-
-    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
-    doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)), 
Optional.empty()))
+    dagManagementStateStore.checkpointDag(dag);
+    doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), 
Optional.empty()))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
 
     ReevaluateDagProc
@@ -231,7 +213,6 @@ public class ReevaluateDagProcTest {
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
-    Mockito.verify(dagActionReminderScheduler, 
Mockito.never()).unscheduleReminderJob(any());
   }
 
   @Test
@@ -242,12 +223,14 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
         .jobName("job3").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
         
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    dagManagementStateStore.checkpointDag(dag);
 
-    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus)))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index c68bde24e..c84132c8a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -19,10 +19,7 @@ package 
org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
 
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
@@ -35,15 +32,11 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
@@ -51,8 +44,6 @@ import 
org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
 
@@ -64,9 +55,7 @@ public class ResumeDagProcTest {
   public void setUp() throws Exception {
     testDb = TestMetastoreDatabaseFactory.get();
     this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
-    
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
-    doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
-    doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), 
any());
+    LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
 
   @AfterClass(alwaysRun = true)
@@ -80,38 +69,41 @@ public class ResumeDagProcTest {
   This test creates a failed dag and launches a resume dag proc for it. It 
then verifies that the next jobs are set to run.
    */
   @Test
-  public void resumeDag()
-      throws IOException, URISyntaxException, ExecutionException, 
InterruptedException {
+  public void resumeDag() throws IOException, URISyntaxException, 
ExecutionException, InterruptedException {
     long flowExecutionId = 12345L;
     String flowGroup = "fg";
     String flowName = "fn";
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
-        5, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
-            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName)));
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     // simulate a failed dag in store
     
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
     
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
     
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
     
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
-    
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
+    this.dagManagementStateStore.checkpointDag(dag);
+    // simulate it as a failed dag
+    this.dagManagementStateStore.markDagFailed(dag);
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.RESUME),
         null, this.dagManagementStateStore));
     resumeDagProc.process(this.dagManagementStateStore);
 
-    SpecProducer<Spec> specProducer = 
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));
-    List<SpecProducer<Spec>> otherSpecProducers = 
dag.getNodes().stream().map(node -> {
-      try {
-        return DagManagerUtils.getSpecProducer(node);
-      } catch (ExecutionException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }).filter(sp -> specProducer != sp).collect(Collectors.toList());
     int expectedNumOfResumedJobs = 1; // = number of resumed nodes
 
-    Mockito.verify(specProducer, 
Mockito.times(expectedNumOfResumedJobs)).addSpec(any());
+    /* only the current job should have run
+     we cannot check the spec producers if addSpec is called on them, because 
in resumeDagProc, a dag is stored in DMSS
+     and retrieved, hence it goes through serialization/deserialization; in 
this process  the SpecProducer objects in
+     a dag node are recreated and addSpec is called on new objects.
+     warning - in unit tests, a test dag is created through different ways 
(e.g. DagManagerTest.buildDag() or DagTestUtils.buildDag()
+     different methods create different spec executors (e.g. 
MockedSpecExecutor.createDummySpecExecutor() or
+     buildNaiveTopologySpec().getSpecExecutor() respectively.
+     the result will be that after serializing/deserializing the test dag, the 
spec executor (and producer) type may change */
+
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
-    otherSpecProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
   }
 }


Reply via email to