This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f0e53792f [GOBBLIN-2115] implement DagNodeStateStore (#3999)
2f0e53792f is described below

commit 2f0e53792fde7f852a808f73f9fc32a627fd2b71
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Jul 18 14:07:25 2024 -0700

    [GOBBLIN-2115] implement DagNodeStateStore (#3999)
    
    * implement DagNodeStateStore
    * addressed review comments and remove some unused code
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   4 +
 .../apache/gobblin/metastore/MysqlStateStore.java  |   2 +-
 .../runtime/KafkaAvroJobStatusMonitorTest.java     |   3 +-
 .../modules/core/GobblinServiceGuiceModule.java    |   4 +-
 .../gobblin/service/modules/flowgraph/Dag.java     |   3 +
 .../orchestration/DagManagementStateStore.java     |  72 +-----
 .../orchestration/DagManagementTaskStreamImpl.java |   2 +-
 .../modules/orchestration/DagManagerUtils.java     |  14 --
 .../modules/orchestration/DagProcessingEngine.java |   1 -
 .../modules/orchestration/DagStateStore.java       |  14 ++
 .../orchestration/DagStateStoreWithDagNodes.java   |  56 +++++
 ...tore.java => MySqlDagManagementStateStore.java} | 129 +++--------
 .../modules/orchestration/MysqlDagActionStore.java |  43 +---
 .../modules/orchestration/MysqlDagStateStore.java  |   5 +-
 .../MysqlDagStateStoreWithDagNodes.java            | 241 +++++++++++++++++++++
 .../orchestration/MysqlUserQuotaManager.java       |   3 +-
 .../modules/orchestration/Orchestrator.java        |   2 +-
 .../modules/orchestration/proc/DagProc.java        |   4 +-
 .../modules/orchestration/proc/DagProcUtils.java   |   6 +-
 .../proc/EnforceFlowFinishDeadlineDagProc.java     |   1 -
 .../proc/EnforceJobStartDeadlineDagProc.java       |   1 -
 .../modules/orchestration/proc/KillDagProc.java    |   1 -
 .../modules/orchestration/proc/LaunchDagProc.java  |  16 +-
 .../orchestration/proc/ReevaluateDagProc.java      |  33 +--
 .../modules/orchestration/proc/ResumeDagProc.java  |   4 +-
 .../modules/spec/JobExecutionPlanDagFactory.java   |   7 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |   2 -
 .../DagManagementTaskStreamImplTest.java           |   4 +-
 .../orchestration/DagProcessingEngineTest.java     |   6 +-
 ....java => MySqlDagManagementStateStoreTest.java} |  64 +++---
 .../orchestration/MysqlDagActionStoreTest.java     |   3 +-
 .../MysqlDagStateStoreWithDagNodesTest.java        | 136 ++++++++++++
 .../modules/orchestration/OrchestratorTest.java    |   4 +-
 .../proc/EnforceDeadlineDagProcsTest.java          |  51 +++--
 .../orchestration/proc/KillDagProcTest.java        |  12 +-
 .../orchestration/proc/LaunchDagProcTest.java      |  12 +-
 .../orchestration/proc/ReevaluateDagProcTest.java  |  41 ++--
 .../orchestration/proc/ResumeDagProcTest.java      |  19 +-
 38 files changed, 656 insertions(+), 369 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index aeadf4dc61..c37a7f5e95 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -163,6 +163,10 @@ public class ServiceConfigKeys {
   public static final int MAX_FLOW_EXECUTION_ID_LENGTH = 13; // length of 
flowExecutionId which is epoch timestamp
   public static final int MAX_JOB_NAME_LENGTH = 374;
   public static final int MAX_JOB_GROUP_LENGTH = 374;
+  public static final int MAX_DAG_NODE_ID_LENGTH = MAX_FLOW_NAME_LENGTH + 
MAX_FLOW_GROUP_LENGTH + MAX_FLOW_EXECUTION_ID_LENGTH +
+      MAX_JOB_NAME_LENGTH + MAX_JOB_GROUP_LENGTH + 4; // 4 to account for 
delimiters' length
+  public static final int MAX_DAG_ID_LENGTH = MAX_FLOW_NAME_LENGTH + 
MAX_FLOW_GROUP_LENGTH + MAX_FLOW_EXECUTION_ID_LENGTH
+      + 2; // 2 to account for delimiters' length
   public static final String STATE_STORE_TABLE_SUFFIX = "gst";
   public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
   public static final String DAG_STORE_KEY_SEPARATION_CHARACTER = "_";
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index aa8d00f5e1..af7203c69b 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -202,7 +202,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
    * @param config the properties used for datasource instantiation
    * @return
    */
-  static DataSource newDataSource(Config config) {
+  public static DataSource newDataSource(Config config) {
     HikariDataSource dataSource = new HikariDataSource();
     PasswordManager passwordManager = 
PasswordManager.getInstance(ConfigUtils.configToProperties(config));
 
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index f03a7ff4ba..e20cfd97fb 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -75,7 +75,6 @@ import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -197,7 +196,7 @@ public class KafkaAvroJobStatusMonitorTest {
 
   @Test (dependsOnMethods = "testProcessMessageForSuccessfulFlow")
   public void testProcessMessageForFailedFlow() throws IOException, 
ReflectiveOperationException {
-    DagManagementStateStore dagManagementStateStore = 
mock(MostlyMySqlDagManagementStateStore.class);
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic2");
 
     //Submit GobblinTrackingEvents to Kafka
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 6365239ec4..6ed53ba00e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -77,7 +77,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagTaskStream;
 import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
 import 
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
 import 
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -177,7 +177,7 @@ public class GobblinServiceGuiceModule implements Module {
     OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
     if (serviceConfig.isWarmStandbyEnabled()) {
       binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
-      
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class);
+      
binder.bind(DagManagementStateStore.class).to(MySqlDagManagementStateStore.class);
       
binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
       
binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.class);
       
binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.class);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 50a4dcb5c2..29ffc485e0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -50,11 +50,14 @@ public class Dag<T> {
   private List<DagNode<T>> nodes;
 
   @Setter
+  @Deprecated // because this field is not persisted in mysql and contains 
information in very limited cases
   private String message;
   @Setter
+  @Deprecated // because this field is not persisted in mysql and contains 
information in very limited cases
   private String flowEvent;
   // Keep track of when the final flow status is emitted, in milliseconds to 
avoid many duplicate events
   @Setter @Getter
+  @Deprecated // because this field is not persisted in mysql and contains 
information in very limited cases
   private long eventEmittedTimeMillis = -1;
 
   public Dag(List<DagNode<T>> dagNodes) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 515036c3af..139082545b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -42,6 +41,8 @@ import org.apache.gobblin.service.monitoring.JobStatus;
  * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
  * and allows add/delete and other functions.
  */
+// todo - this should merge with DagStateStoreWithDagNodes interface and 
`addDagNodeState` and `addDagNode` should merge and rename to
+  // `updateDagNode`
 @Alpha
 public interface DagManagementStateStore {
   /**
@@ -57,34 +58,17 @@ public interface DagManagementStateStore {
   void removeFlowSpec(URI uri, Properties headers, boolean triggerListener);
 
   /**
-   * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
-   * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
-   * dag node's state, on resuming a dag, on receiving a new dag from 
orchestrator.
-   * Calling on a previously checkpointed Dag updates it.
+   * Adds a {@link Dag} in the store.
    * Opposite of this is {@link DagManagementStateStore#deleteDag} that 
removes the Dag from the store.
    * @param dag The dag to checkpoint
    */
-  void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException;
-
-  /**
-   @return whether `dagId` is currently known due to {@link 
DagManagementStateStore#checkpointDag} but not yet
-   {@link DagManagementStateStore#deleteDag}
-   */
-  boolean containsDag(DagManager.DagId dagId) throws IOException;
+  void addDag(Dag<JobExecutionPlan> dag) throws IOException;
 
   /**
    @return the {@link Dag}, if present
    */
   Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws 
IOException;
 
-  /**
-   * Delete the {@link Dag} from the backing store, typically called upon 
completion of execution.
-   * @param dag The dag completed/cancelled execution on {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
-   */
-  default void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
-    deleteDag(DagManagerUtils.generateDagId(dag));
-  }
-
   /**
    * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
    * @param dagId The ID of the dag to clean up.
@@ -94,38 +78,25 @@ public interface DagManagementStateStore {
   /**
    * This marks the dag as a failed one.
    * Failed dags are queried using {@link 
DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
-   * @param dag failing dag
+   * @param dagId failing dag's dagId
    */
-  void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException;
-
-  /**
-   * Return a list of all failed dags' IDs contained in the dag state store.
-   */
-  Set<String> getFailedDagIds() throws IOException;
+  void markDagFailed(DagManager.DagId dagId) throws IOException;
 
   /**
    * Returns the failed dag.
-   * If the dag is not found because it was never marked as failed through 
{@link DagManagementStateStore#markDagFailed(Dag)},
+   * If the dag is not found because it was never marked as failed through
+   * {@link 
DagManagementStateStore#markDagFailed(org.apache.gobblin.service.modules.orchestration.DagManager.DagId)},
    * it returns Optional.absent.
    * @param dagId dag id of the failed dag
    */
   Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws 
IOException;
 
-  /**
-   * Deletes the failed dag. No-op if dag is not found or is not marked as 
failed.
-   * @param dag
-   * @throws IOException
-   */
-  default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
-    deleteFailedDag(DagManagerUtils.generateDagId(dag));
-  }
-
   void deleteFailedDag(DagManager.DagId dagId) throws IOException;
 
   /**
    * Adds state of a {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
    * Note that a DagNode is a part of a Dag and must already be present in the 
store through
-   * {@link DagManagementStateStore#checkpointDag}. This call is just an 
additional identifier which may be used
+   * {@link DagManagementStateStore#addDag}. This call is just an additional 
identifier which may be used
    * for DagNode level operations. In the future, it may be merged with 
checkpointDag.
    * @param dagNode dag node to be added
    * @param dagId dag id of the dag this dag node belongs to
@@ -138,7 +109,8 @@ public interface DagManagementStateStore {
    * JobStatus can be non-empty only if DagNode is non-empty.
    * @param dagNodeId of the dag node
    */
-  Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId);
+  Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId)
+      throws IOException;
 
   /**
    * Returns a list of {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
@@ -147,26 +119,6 @@ public interface DagManagementStateStore {
    */
   Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) 
throws IOException;
 
-  /**
-   * Deletes the dag node state that was added through {@link 
DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
-   * No-op if the dag node is not found in the store.
-   * @param dagNode dag node to be deleted
-   * @param dagId dag id of the dag this dag node belongs to
-   */
-  void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode);
-
-  /**
-   * Loads all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
-   * takes over or on restart of service.
-   * @return a {@link List} of currently running {@link Dag}s.
-   */
-  List<Dag<JobExecutionPlan>> getDags() throws IOException;
-
-  /**
-   * Initialize the dag quotas with the provided set of dags.
-   */
-  void initQuota(Collection<Dag<JobExecutionPlan>> dags) throws IOException;
-
   /**
    * Checks if the dagNode exceeds the statically configured user quota for 
the proxy user, requester user and flowGroup.
    * It also increases the quota usage for proxy user, requester and the 
flowGroup of the given DagNode by one.
@@ -195,7 +147,7 @@ public interface DagManagementStateStore {
    * Returns true if the {@link Dag} identified by the given {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
    * has any running job, false otherwise.
    */
-  public boolean hasRunningJobs(DagManager.DagId dagId);
+  boolean hasRunningJobs(DagManager.DagId dagId) throws IOException;
 
   /**
    * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 064e721dd9..c2507c5ebd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -140,7 +140,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
             }
           }
         } catch (Exception e) {
-          //TODO: need to handle exceptions gracefully
+          //TODO: add metrics
           log.error("Exception getting DagAction from the queue or creating 
DagTask. dagAction - {}", dagAction == null ? "<null>" : dagAction, e);
         }
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index d4b5233a9f..7323543f61 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -67,20 +67,6 @@ public class DagManagerUtils {
     return getFlowId(dag.getStartNodes().get(0));
   }
 
-  public static DagActionStore.DagAction 
createDagActionFromDag(Dag<JobExecutionPlan> dag, DagActionStore.DagActionType 
dagActionType) {
-    return createDagActionFromDagNode(dag.getStartNodes().get(0), 
dagActionType);
-  }
-
-  // todo - verify if jobName will always be present or need default and 
update FlowId, DagId etc to contain jobName
-  public static DagActionStore.DagAction 
createDagActionFromDagNode(DagNode<JobExecutionPlan> dagNode, 
DagActionStore.DagActionType dagActionType) {
-    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
-    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-    String flowName =  jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
-    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
-  }
-
   static FlowId getFlowId(DagNode<JobExecutionPlan> dagNode) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 00ac9eba00..449b9cf994 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -131,7 +131,6 @@ public class DagProcessingEngine extends 
AbstractIdleService {
         }
         DagProc<?> dagProc = dagTask.host(dagProcFactory);
         try {
-          // todo - add retries
           dagProc.process(dagManagementStateStore);
           dagTask.conclude();
         } catch (Exception e) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index 09824ddcb9..7fa1a2f219 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -47,28 +47,42 @@ public interface DagStateStore {
    */
   void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
 
+  default boolean cleanUp(DagManager.DagId dagId) throws IOException {
+    cleanUp(dagId.toString());
+    return true;
+  }
+
   /**
    * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
    * @param dagId The ID of the dag to clean up.
    */
+  @Deprecated
   void cleanUp(String dagId) throws IOException;
 
   /**
    * Load all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
    * takes over or on restart of service.
+   * @deprecated because {@link DagProcessingEngine} that will replace {@link 
DagManager} does not need this API
    * @return a {@link List} of currently running {@link Dag}s.
    */
+  @Deprecated
   List<Dag<JobExecutionPlan>> getDags() throws IOException;
 
   /**
    * Return a single dag from the dag state store.
    * @param dagId The ID of the dag to load.
    */
+  default Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws 
IOException {
+    return getDag(dagId.toString());
+  }
+
+  @Deprecated
   Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
 
   /**
    * Return a list of all dag IDs contained in the dag state store.
    */
+  @Deprecated
   Set<String> getDagIds() throws IOException;
 
   default boolean existsDag(DagManager.DagId dagId) throws IOException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
new file mode 100644
index 0000000000..03aaf41520
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface for storing and retrieving currently running {@link 
Dag.DagNode<JobExecutionPlan>}s.
+ * Callers should use {@link DagStateStore#writeCheckpoint} to store dags. 
After that, to update individual
+ * {@link Dag.DagNode}s, {@link DagStateStoreWithDagNodes#updateDagNode} 
should be used.
+ * {@link DagStateStore#cleanUp(DagManager.DagId)} should be used to delete 
all the {@link Dag.DagNode}s for a {@link Dag}.
+ */
+public interface DagStateStoreWithDagNodes extends DagStateStore {
+
+  /**
+   * Updates a dag node identified by the provided {@link DagManager.DagId}
+   * with the given {@link Dag.DagNode}.
+   * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 
0 if new dag node is same as the existing one
+   * <a 
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html";>Refer</a>
+   */
+  int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) throws IOException;
+
+  /**
+   * Returns all the {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
+   * {@link org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
+   */
+  Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) 
throws IOException;
+
+  /**
+   * Return the {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for the given {@link 
DagNodeId} or empty
+   * optional if it is not present
+   */
+  Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId) 
throws IOException;
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
similarity index 65%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index d0c68a3fd9..285700b845 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -20,14 +20,11 @@ import java.io.IOException;
 import java.net.URI;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -46,6 +43,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
@@ -62,13 +60,11 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  */
 @Slf4j
 @Singleton
-public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateStore {
-  private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
ConcurrentHashMap<>();
-  // dagToJobs holds a map of dagId to running jobs of that dag
-  private final Map<DagManager.DagId, Set<Dag.DagNode<JobExecutionPlan>>> 
dagToJobs = new ConcurrentHashMap<>();
-  private DagStateStore dagStateStore;
-  private DagStateStore failedDagStateStore;
-  private JobStatusRetriever jobStatusRetriever;
+public class MySqlDagManagementStateStore implements DagManagementStateStore {
+  // todo - these two stores should merge
+  private DagStateStoreWithDagNodes dagStateStore;
+  private DagStateStoreWithDagNodes failedDagStateStore;
+  private final JobStatusRetriever jobStatusRetriever;
   private boolean dagStoresInitialized = false;
   private final UserQuotaManager quotaManager;
   Map<URI, TopologySpec> topologySpecMap;
@@ -81,7 +77,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   private final DagActionStore dagActionStore;
 
   @Inject
-  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog, UserQuotaManager userQuotaManager,
+  public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, 
UserQuotaManager userQuotaManager,
       JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
     this.quotaManager = userQuotaManager;
     this.config = config;
@@ -116,109 +112,69 @@ public class MostlyMySqlDagManagementStateStore 
implements DagManagementStateSto
     this.flowCatalog.remove(uri, headers, triggerListener);
   }
 
-  public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) throws IOException {
+  public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) {
     this.topologySpecMap = topologySpecMap;
     start();
   }
 
-  private DagStateStore createDagStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) {
+  private DagStateStoreWithDagNodes createDagStateStore(Config config, 
Map<URI, TopologySpec> topologySpecMap) {
     try {
-      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
-      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStoreWithDagNodes.class.getName()));
+      return (DagStateStoreWithDagNodes) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException(e);
     }
   }
 
   @Override
-  public void checkpointDag(Dag<JobExecutionPlan> dag) throws IOException {
+  public void addDag(Dag<JobExecutionPlan> dag) throws IOException {
     this.dagStateStore.writeCheckpoint(dag);
   }
 
   @Override
-  public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
-    this.dagStateStore.cleanUp(dag);
-    // todo - updated failedDagStateStore iff cleanup returned 1
+  public void markDagFailed(DagManager.DagId dagId) throws IOException {
+    Dag<JobExecutionPlan> dag = this.dagStateStore.getDag(dagId);
     this.failedDagStateStore.writeCheckpoint(dag);
-    log.info("Marked dag failed {}", DagManagerUtils.generateDagId(dag));
-  }
-
-  @Override
-  public void deleteDag(Dag<JobExecutionPlan> dag) throws IOException {
-    this.dagStateStore.cleanUp(dag);
-  }
-
-  @Override
-  public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
-    this.failedDagStateStore.cleanUp(dag);
+    this.dagStateStore.cleanUp(dagId);
+    // todo - updated failedDagStateStore iff cleanup returned 1
+    // or merge dagStateStore and failedDagStateStore and change the flag that 
marks a dag `failed`
+    log.info("Marked dag failed {}", dagId);
   }
 
   @Override
   public void deleteDag(DagManager.DagId dagId) throws IOException {
-    this.dagStateStore.cleanUp(dagId.toString());
+    this.dagStateStore.cleanUp(dagId);
     log.info("Deleted dag {}", dagId);
   }
 
   @Override
   public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
-    this.failedDagStateStore.cleanUp(dagId.toString());
-  }
-
-  @Override
-  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
-    return this.dagStateStore.getDags();
+    this.failedDagStateStore.cleanUp(dagId);
+    log.info("Deleted failed dag {}", dagId);
   }
 
   @Override
   public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) 
throws IOException {
-    return Optional.of(this.failedDagStateStore.getDag(dagId.toString()));
+    return Optional.of(this.failedDagStateStore.getDag(dagId));
   }
 
-  @Override
-  public Set<String> getFailedDagIds() throws IOException {
-    return this.failedDagStateStore.getDagIds();
-  }
-
-  @Override
-  // todo - updating different maps here and in addDagNodeState can result in 
inconsistency between the maps
-  public synchronized void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
-    this.dagNodes.remove(dagNode.getValue().getId());
-    if (this.dagToJobs.containsKey(dagId)) {
-      this.dagToJobs.get(dagId).remove(dagNode);
-      if (this.dagToJobs.get(dagId).isEmpty()) {
-        this.dagToJobs.remove(dagId);
-      }
-    }
-  }
-
-  // todo - updating different mapps here and in deleteDagNodeState can result 
in inconsistency between the maps
   @Override
   public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> 
dagNode, DagManager.DagId dagId)
       throws IOException {
-    if (!containsDag(dagId)) {
-      throw new RuntimeException("Dag " + dagId + " not found");
-    }
-    this.dagNodes.put(dagNode.getValue().getId(), dagNode);
-    if (!this.dagToJobs.containsKey(dagId)) {
-      this.dagToJobs.put(dagId, new HashSet<>());
-    }
-    this.dagToJobs.get(dagId).add(dagNode);
+    this.dagStateStore.updateDagNode(dagId, dagNode);
   }
 
   @Override
   public Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws 
IOException {
-    return Optional.ofNullable(this.dagStateStore.getDag(dagId.toString()));
-  }
-
-  @Override
-  public boolean containsDag(DagManager.DagId dagId) throws IOException {
-    return this.dagStateStore.existsDag(dagId);
+    return Optional.ofNullable(this.dagStateStore.getDag(dagId));
   }
 
   @Override
-  public Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
-    if (this.dagNodes.containsKey(dagNodeId)) {
-      return ImmutablePair.of(Optional.of(this.dagNodes.get(dagNodeId)), 
getJobStatus(dagNodeId));
+  public Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId)
+      throws IOException {
+    Optional<Dag.DagNode<JobExecutionPlan>> dagNode = 
this.dagStateStore.getDagNode(dagNodeId);
+    if (dagNode.isPresent()) {
+      return ImmutablePair.of(dagNode, getJobStatus(dagNodeId));
     } else {
       // no point of searching for status if the node itself is absent.
       return ImmutablePair.of(Optional.empty(), Optional.empty());
@@ -226,18 +182,8 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   }
 
   @Override
-  public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId 
dagId) {
-    Set<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
-    if (dagNodes != null) {
-      return dagNodes;
-    } else {
-      return new HashSet<>();
-    }
-  }
-
-  public void initQuota(Collection<Dag<JobExecutionPlan>> dags) {
-    // This implementation does not need to update quota usage when the 
service restarts or when its leadership status changes
-    // because quota usage are persisted in mysql table
+  public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId 
dagId) throws IOException {
+    return this.dagStateStore.getDagNodes(dagId);
   }
 
   @Override
@@ -264,16 +210,11 @@ public class MostlyMySqlDagManagementStateStore 
implements DagManagementStateSto
     }
   }
 
-  /* todo - this method works because when the jobs finish they are deleted 
from the DMSS -> if no more job is found, means
-   no more running jobs.
-   But DMSS still has dags and which still contains dag nodes. We need to 
revisit this method's logic when we change
-   DMSS to a fully mysql backed implementation. then we may want to consider 
this approach
-   return getDagNodes(dagId).stream()
-       .anyMatch(node -> 
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
-  */
+
   @Override
-  public boolean hasRunningJobs(DagManager.DagId dagId) {
-    return !getDagNodes(dagId).isEmpty();
+  public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException {
+    return getDagNodes(dagId).stream()
+        .anyMatch(node -> 
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 5ba32539fe..c96478c79d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -38,7 +38,6 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.DBStatementExecutor;
-import org.apache.gobblin.util.ExponentialBackoff;
 
 
 @Slf4j
@@ -49,14 +48,11 @@ public class MysqlDagActionStore implements DagActionStore {
   protected final DataSource dataSource;
   private final DBStatementExecutor dbStatementExecutor;
   private final String tableName;
-  private final long retentionPeriodSeconds;
-  private String thisTableRetentionStatement;
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE flow_group = ? AND flow_name = ? AND flow_execution_id = ? AND 
job_name = ? AND dag_action = ?)";
 
   protected static final String INSERT_STATEMENT = "INSERT INTO %s 
(flow_group, flow_name, flow_execution_id, job_name, dag_action) "
       + "VALUES (?, ?, ?, ?, ?)";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE 
flow_group = ? AND flow_name =? AND flow_execution_id = ? AND job_name = ? AND 
dag_action = ?";
-  private static final String GET_STATEMENT = "SELECT flow_group, flow_name, 
flow_execution_id, job_name, dag_action FROM %s WHERE flow_group = ? AND 
flow_name =? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?";
   private static final String GET_ALL_STATEMENT = "SELECT flow_group, 
flow_name, flow_execution_id, job_name, dag_action FROM %s";
   private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT 
EXISTS %s (" +
   "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT 
NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT 
NULL, "
@@ -64,11 +60,10 @@ public class MysqlDagActionStore implements DagActionStore {
       + "job_name varchar(" + ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ") NOT 
NULL, "
       + "dag_action varchar(50) NOT NULL, modified_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP  on update CURRENT_TIMESTAMP NOT NULL, "
       + "PRIMARY KEY 
(flow_group,flow_name,flow_execution_id,job_name,dag_action))";
+
   // Deletes rows older than retention time period (in seconds) to prevent 
this table from growing unbounded.
   private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE 
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %s SECOND)";
 
-  private final int getDagActionMaxRetries;
-
   @Inject
   public MysqlDagActionStore(Config config) throws IOException {
     if (config.hasPath(CONFIG_PREFIX)) {
@@ -78,9 +73,9 @@ public class MysqlDagActionStore implements DagActionStore {
     }
     this.tableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
         ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
-    this.getDagActionMaxRetries = ConfigUtils.getInt(config, 
ConfigurationKeys.MYSQL_GET_MAX_RETRIES, 
ConfigurationKeys.DEFAULT_MYSQL_GET_MAX_RETRIES);
-    this.retentionPeriodSeconds = ConfigUtils.getLong(config, 
ConfigurationKeys.MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY,
-        
ConfigurationKeys.DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY);
+    long retentionPeriodSeconds =
+        ConfigUtils.getLong(config, 
ConfigurationKeys.MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY,
+            
ConfigurationKeys.DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY);
     this.dataSource = MysqlDataSourceFactory.get(config,
         SharedResourcesBrokerFactory.getImplicitBroker());
     try (Connection connection = dataSource.getConnection();
@@ -91,9 +86,9 @@ public class MysqlDagActionStore implements DagActionStore {
       throw new IOException("Failure creation table " + tableName, e);
     }
     this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
-    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    String thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
     // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
-    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
+    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6L, TimeUnit.HOURS);
   }
 
   @Override
@@ -141,32 +136,6 @@ public class MysqlDagActionStore implements DagActionStore 
{
     }}, true);
   }
 
-  // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
-  private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
long flowExecutionId, String jobName, DagActionType dagActionType, 
ExponentialBackoff exponentialBackoff)
-      throws IOException, SQLException {
-    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
-      int i = 0;
-      getStatement.setString(++i, flowGroup);
-      getStatement.setString(++i, flowName);
-      getStatement.setString(++i, String.valueOf(flowExecutionId));
-      getStatement.setString(++i, dagActionType.toString());
-      try (ResultSet rs = getStatement.executeQuery()) {
-        if (rs.next()) {
-          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getLong(3), rs.getString(4), DagActionType.valueOf(rs.getString(5)));
-        } else if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType, exponentialBackoff);
-        } else {
-          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
-              dagActionType, flowGroup, flowName, flowExecutionId));
-          return null;
-        }
-      } catch (SQLException | InterruptedException e) {
-        throw new IOException(String.format("Failure get %s from table %s",
-            new DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType), tableName), e);
-      }
-    }, true);
-  }
-
   @Override
   public Collection<DagAction> getDagActions() throws IOException {
     return 
dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT, 
tableName), getAllStatement -> {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 0879704468..129aff2bef 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -61,14 +61,15 @@ import static 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.g
  * It implements interfaces of {@link DagStateStore} but delegating 
responsibilities to methods provided
  * in {@link MysqlStateStore}.
  * It also implements conversion between {@link Dag<JobExecutionPlan>} to 
{@link State}.
- *
+ * <p>
  * The schema of this will simply be:
  * | storeName | tableName | State |
  * where storeName represents FlowId, a combination of FlowGroup and FlowName, 
and tableName represents FlowExecutionId.
  * State is a pocket for serialized {@link Dag} object.
  *
- *
+ * Deprecated in favor of {@link MysqlDagStateStoreWithDagNodes}
  */
+@Deprecated
 public class MysqlDagStateStore implements DagStateStore {
 
   public static final String CONFIG_PREFIX = GOBBLIN_SERVICE_PREFIX + 
"mysqlDagStateStore";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
new file mode 100644
index 0000000000..2692e20697
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.DBStatementExecutor;
+
+import static 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateDagId;
+
+
+/**
+ * An implementation of {@link DagStateStoreWithDagNodes} using MySQL as a 
backup.
+ */
+@Slf4j
+public class MysqlDagStateStoreWithDagNodes implements 
DagStateStoreWithDagNodes {
+
+  public static final String CONFIG_PREFIX = MysqlDagStateStore.CONFIG_PREFIX;
+  protected final DBStatementExecutor dbStatementExecutor;
+  protected final String tableName;
+  protected final GsonSerDe<List<JobExecutionPlan>> serDe;
+  private final JobExecutionPlanDagFactory jobExecPlanDagFactory;
+
+  // todo add a column that tells if it is a running dag or a failed dag
+  protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT 
EXISTS %s ("
+      + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") 
CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
+      + "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") 
NOT NULL, "
+      + "dag_node JSON NOT NULL, "
+      + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP, "
+      + "PRIMARY KEY (dag_node_id), "
+      + "UNIQUE INDEX dag_node_index (dag_node_id), "
+      + "INDEX dag_index (parent_dag_id))";
+
+  protected static final String INSERT_STATEMENT = "INSERT INTO %s 
(dag_node_id, parent_dag_id, dag_node) "
+      + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = 
new.dag_node";
+  protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node 
FROM %s WHERE parent_dag_id = ?";
+  protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM 
%s WHERE dag_node_id = ?";
+  protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE 
parent_dag_id = ?";
+  private final ContextAwareCounter totalDagCount;
+
+  public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> 
topologySpecMap) throws IOException {
+    if (config.hasPath(CONFIG_PREFIX)) {
+      config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+    }
+
+    String DEFAULT_TABLE_NAME = "dag_node_state_store";
+    this.tableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, DEFAULT_TABLE_NAME);
+    // create table if it does not exist
+    DataSource dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement createStatement = 
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+      createStatement.executeUpdate();
+      connection.commit();
+    } catch (SQLException e) {
+      throw new IOException("Failure creation table " + tableName, e);
+    }
+    this.dbStatementExecutor = new DBStatementExecutor(dataSource, log);
+
+    JsonSerializer<List<JobExecutionPlan>> serializer = new 
JobExecutionPlanListSerializer();
+    JsonDeserializer<List<JobExecutionPlan>> deserializer = new 
JobExecutionPlanListDeserializer(topologySpecMap);
+    Type typeToken = new TypeToken<List<JobExecutionPlan>>() {
+    }.getType();
+    this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
+    this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
+    MetricContext metricContext =
+        Instrumented.getMetricContext(new 
State(ConfigUtils.configToProperties(config)), this.getClass());
+    this.totalDagCount = 
metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
+    log.info("Instantiated {}", getClass().getSimpleName());
+  }
+
+  @Override
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag)
+      throws IOException {
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+    boolean newDag = false;
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      if (updateDagNode(dagId, dagNode) == 1) {
+        newDag = true;
+      }
+    }
+    if (newDag) {
+      this.totalDagCount.inc();
+    }
+  }
+
+  @Override
+  public void cleanUp(Dag<JobExecutionPlan> dag) throws IOException {
+    cleanUp(generateDagId(dag));
+  }
+
+  @Override
+  public boolean cleanUp(DagManager.DagId dagId) throws IOException {
+    
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, 
tableName), deleteStatement -> {
+      try {
+        deleteStatement.setString(1, dagId.toString());
+        return deleteStatement.executeUpdate() != 0;
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure deleting dag for %s", 
dagId), e);
+      }}, true);
+    this.totalDagCount.dec();
+    return true;
+  }
+
+  @Override
+  public void cleanUp(String dagId) throws IOException {
+    throw new NotSupportedException(getClass().getSimpleName() + " does not 
need this legacy API that originated with "
+        + "the DagManager that is replaced by DagProcessingEngine");
+  }
+
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    throw new NotSupportedException(getClass().getSimpleName() + " does not 
need this legacy API that originated with "
+        + "the DagManager that is replaced by DagProcessingEngine");  }
+
+  @Override
+  public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws 
IOException {
+    return convertDagNodesIntoDag(getDagNodes(dagId));
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getDag(String dagId) throws IOException {
+    throw new NotSupportedException(getClass().getSimpleName() + " does not 
need this API");
+  }
+
+  @Override
+  public Set<String> getDagIds() throws IOException {
+    throw new NotSupportedException(getClass().getSimpleName() + " does not 
need this API");
+  }
+
+  /**
+   * Get the {@link Dag} out of a {@link State} pocket.
+   */
+  private Dag<JobExecutionPlan> 
convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutionPlan>> dagNodes) {
+    if (dagNodes.isEmpty()) {
+      return null;
+    }
+    return 
jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList()));
+  }
+
+  @Override
+  public int updateDagNode(DagManager.DagId parentDagId, 
Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+    String dagNodeId = dagNode.getValue().getId().toString();
+    return 
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+      try {
+        insertStatement.setString(1, dagNodeId);
+        insertStatement.setString(2, parentDagId.toString());
+        insertStatement.setString(3, 
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
+        return insertStatement.executeUpdate();
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure adding dag node for %s", 
dagNodeId), e);
+      }}, true);
+  }
+
+  @Override
+  public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId 
parentDagId) throws IOException {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT,
 tableName), getStatement -> {
+      getStatement.setString(1, parentDagId.toString());
+      HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
+      try (ResultSet rs = getStatement.executeQuery()) {
+        while (rs.next()) {
+          dagNodes.add(new 
Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
+        }
+        return dagNodes;
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure get dag nodes for dag 
%s", parentDagId), e);
+      }
+    }, true);
+  }
+
+  @Override
+  public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId 
dagNodeId) throws IOException {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODE_STATEMENT, 
tableName), getStatement -> {
+      getStatement.setString(1, dagNodeId.toString());
+      try (ResultSet rs = getStatement.executeQuery()) {
+        if (rs.next()) {
+          return Optional.of(new 
Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
+        }
+        return Optional.empty();
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure get dag node for %s", 
dagNodeId), e);
+      }
+    }, true);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index c4e3b87137..af67481bc7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -324,7 +324,8 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
       String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " 
(name VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
           + "user_count INT NOT NULL DEFAULT 0, requester_count INT NOT NULL 
DEFAULT 0, flowgroup_count INT NOT NULL DEFAULT 0, "
           + "PRIMARY KEY (name), " + "UNIQUE INDEX ind (name))";
-      try (Connection connection = dataSource.getConnection(); 
PreparedStatement createStatement = 
connection.prepareStatement(createQuotaTable)) {
+      try (Connection connection = dataSource.getConnection();
+          PreparedStatement createStatement = 
connection.prepareStatement(createQuotaTable)) {
         createStatement.executeUpdate();
       } catch (SQLException e) {
         // TODO: revisit use of connection test query following verification 
of successful connection pool migration:
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 04e12c13ed..e917c53611 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -125,7 +125,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManager.
     this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
     if (dagManagementStateStore.isPresent()) {
-      ((MostlyMySqlDagManagementStateStore) 
dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+      ((MySqlDagManagementStateStore) 
dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
     }
 
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index fece28d8c3..effe61c360 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -64,8 +64,8 @@ public abstract class DagProc<T> {
   }
 
   public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
-    T state = initialize(dagManagementStateStore);   // todo - retry
-    act(dagManagementStateStore, state);   // todo - retry
+    T state = initialize(dagManagementStateStore);
+    act(dagManagementStateStore, state);
     log.info("{} concluded processing for dagId : {}", 
getClass().getSimpleName(), this.dagId);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 808b878589..1f4c504ed9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -70,7 +70,6 @@ public class DagProcUtils {
     if (nextNodes.size() == 1) {
       Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
       DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
dagId);
-      log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), dagId);
     } else {
       for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
         JobExecutionPlan jobExecutionPlan = dagNode.getValue();
@@ -153,6 +152,7 @@ public class DagProcUtils {
   public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
     Properties props = new Properties();
     DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+
     if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
       props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
           
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
@@ -169,8 +169,8 @@ public class DagProcUtils {
             dagNodeToCancel.getValue().getJobSpec().getUri());
       }
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();
-      // todo - why was it not being cleaned up in DagManager?
-      dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+      // add back the dag node with updated states in the store
+      dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index a7d388b9b3..e5aa4125dc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -58,7 +58,6 @@ public class EnforceFlowFinishDeadlineDagProc extends 
DeadlineEnforcementDagProc
 
       dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
       dag.setMessage("Flow killed due to exceeding SLA of " + 
flowFinishDeadline + " ms");
-      dagManagementStateStore.checkpointDag(dag);
     } else {
       log.error("EnforceFlowFinishDeadline dagAction received before due time. 
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 15fa11c7e4..5eb2236f8f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -77,7 +77,6 @@ public class EnforceJobStartDeadlineDagProc extends 
DeadlineEnforcementDagProc {
       DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
       dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
       dag.setMessage("Flow killed because no update received for " + 
timeOutForJobStart + " ms after orchestration");
-      dagManagementStateStore.checkpointDag(dag);
     }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index 15448d62b8..ab68d3e7b1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -62,7 +62,6 @@ public class KillDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
 
     dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
     dag.get().setMessage("Flow killed by request");
-    dagManagementStateStore.checkpointDag(dag.get());
 
     if (this.shouldKillSpecificJob) {
       Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = 
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index c2d840ef70..88f3249f1c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -20,17 +20,14 @@ package 
org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@@ -42,14 +39,6 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 @Slf4j
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
-  // todo - this is not orchestration delay and should be renamed. keeping it 
the same because DagManager is also using
-  // the same name
-  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
-
-  static {
-    metricContext.register(
-        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
-  }
 
   public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     super(launchDagTask);
@@ -68,7 +57,7 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
getDagId().getFlowExecutionId());
       Optional<Dag<JobExecutionPlan>> dag = 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
       if (dag.isPresent()) {
-        dagManagementStateStore.checkpointDag(dag.get());
+        dagManagementStateStore.addDag(dag.get());
       }
       return dag;
     } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
| IOException e) {
@@ -84,10 +73,7 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       // todo - add metrics
     } else {
       DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), 
getDagId());
-      // Checkpoint the dag state, it should have an updated value of dag nodes
-      dagManagementStateStore.checkpointDag(dag.get());
       
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
-      orchestrationDelayCounter.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag.get()));
     }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index a57de48189..27525c057c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -27,7 +27,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -76,11 +75,9 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
       return;
     }
 
-    Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
     JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
     ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
-    // pass dag, so that dag is updated too, updated information will be 
required in onJobFinish in finding next jobs to submit
-    setStatus(dagManagementStateStore, dag, getDagNodeId(), executionStatus);
+    updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus);
 
     if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
       log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
@@ -91,7 +88,9 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
           dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES));
     }
 
-    onJobFinish(dagManagementStateStore, dagNode, executionStatus, dag);
+    // get the dag after updating dag node status
+    Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
+    onJobFinish(dagManagementStateStore, dagNode, dag);
 
     if (jobStatus.isShouldRetry()) {
       log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
@@ -114,7 +113,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
         // todo - verify if work from PR#3641 is required
         dagManagementStateStore.deleteDag(getDagId());
       } else {
-        dagManagementStateStore.markDagFailed(dag);
+        dagManagementStateStore.markDagFailed(getDagId());
       }
 
       DagProcUtils.removeFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagId());
@@ -125,17 +124,10 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
    * Sets status of a dag node inside the given Dag.
    * todo - DMSS should support this functionality like an atomic get-and-set 
operation.
    */
-  private void setStatus(DagManagementStateStore dagManagementStateStore,
-      Dag<JobExecutionPlan> dag, DagNodeId dagNodeId, ExecutionStatus 
executionStatus) throws IOException {
-    for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
-      if (node.getValue().getId().equals(dagNodeId)) {
-        node.getValue().setExecutionStatus(executionStatus);
-        dagManagementStateStore.addDagNodeState(node, getDagId());
-        dagManagementStateStore.checkpointDag(dag);
-        return;
-      }
-    }
-    log.error("DagNode with id {} not found in Dag {}", dagNodeId, getDagId());
+  private void updateDagNodeStatus(DagManagementStateStore 
dagManagementStateStore,
+      Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus) 
throws IOException {
+    dagNode.getValue().setExecutionStatus(executionStatus);
+    dagManagementStateStore.addDagNodeState(dagNode, getDagId());
   }
 
   /**
@@ -143,8 +135,9 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
    * This method updates the state of the dag and performs clean up actions as 
necessary.
    */
   private void onJobFinish(DagManagementStateStore dagManagementStateStore, 
Dag.DagNode<JobExecutionPlan> dagNode,
-      ExecutionStatus executionStatus, Dag<JobExecutionPlan> dag) throws 
IOException {
+      Dag<JobExecutionPlan> dag) throws IOException {
     String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
+    ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
     log.info("Job {} of Dag {} has finished with status {}", jobName, 
getDagId(), executionStatus.name());
     // Only decrement counters and quota for jobs that actually ran on the 
executor, not from a GaaS side failure/skip event
     if (dagManagementStateStore.releaseQuota(dagNode)) {
@@ -167,9 +160,5 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
       default:
         log.warn("It should not reach here. Job status {} is unexpected.", 
executionStatus);
     }
-
-    // Checkpoint the dag state, it should have an updated value of dag fields
-    dagManagementStateStore.checkpointDag(dag);
-    dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 68e02494eb..3447035cc4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -87,10 +87,10 @@ public class ResumeDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
 
     // these two statements effectively move the dag from failed dag store to 
(running) dag store.
     // to prevent loss in the unlikely event of failure between the two, we 
add first.
-    dagManagementStateStore.checkpointDag(failedDag.get());
+    dagManagementStateStore.addDag(failedDag.get());
 
     // if it fails here, it will check point the failed dag in the (running) 
dag store again, which is idempotent
-    dagManagementStateStore.deleteFailedDag(failedDag.get());
+    dagManagementStateStore.deleteFailedDag(getDagId());
 
     DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(), 
getDagId());
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index d96c67619c..02973ddb91 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -30,7 +30,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 
@@ -48,7 +47,7 @@ public class JobExecutionPlanDagFactory {
     Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap =
         Maps.newHashMapWithExpectedSize(jobExecutionPlans.size());
     List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new 
ArrayList<>(jobExecutionPlans.size());
-    /**
+    /*
      * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link 
JobSpec} in the flow. Add this node
      * to a HashMap.
      */
@@ -61,12 +60,10 @@ public class JobExecutionPlanDagFactory {
       }
     }
 
-    /**
+    /*
      * Iterate over each {@link JobSpec} to get the dependencies of each 
{@link JobSpec}.
      * For each {@link JobSpec}, get the corresponding {@link Dag.DagNode} and
      * set the {@link Dag.DagNode}s corresponding to its dependencies as its 
parent nodes.
-     *
-     * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
      */
     List<String> jobNames = new ArrayList<>();
     for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d2800fbe36..9a9aac1c60 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -143,7 +143,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         ? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         : ConfigFactory.empty();
     // log exceptions to expose errors we suffer under and/or guide 
intervention when resolution not readily forthcoming
-    // todo - this retryer retries all the exceptions. we should make it retry 
only really transient
     this.persistJobStatusRetryer =
         
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
 Optional.of(new RetryListener() {
           @Override
@@ -234,7 +233,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
           if (this.dagProcEngineEnabled && 
DagProcUtils.isJobLevelStatus(jobName)) {
             if (updatedJobStatus.getRight() == NewState.FINISHED) {
-              // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
               try {
                 this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
               } catch (Exception e) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index f4d06e607a..5ce3b33789 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -57,14 +57,14 @@ public class DagManagementTaskStreamImplTest {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
     ConfigBuilder configBuilder = ConfigBuilder.create();
-    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+    
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 this.testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
     Config config = configBuilder.build();
 
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    MySqlDagManagementStateStore dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
             mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 14dd338f64..06982c5f18 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -54,7 +54,7 @@ public class DagProcessingEngineTest {
   private DagManagementTaskStreamImpl dagManagementTaskStream;
   private DagTaskStream dagTaskStream;
   private DagProcFactory dagProcFactory;
-  private static MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private static MySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testMetastoreDatabase;
   static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
 
@@ -69,14 +69,14 @@ public class DagProcessingEngineTest {
 
     Config config;
     ConfigBuilder configBuilder = ConfigBuilder.create();
-    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+    
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
     config = configBuilder.build();
 
-    dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+    dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
     doReturn(true).when(dagActionStore).deleteDagAction(any());
     dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
similarity index 74%
rename from 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
index 597acadb1a..3185943902 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
@@ -18,7 +18,9 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -27,16 +29,15 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
@@ -44,6 +45,7 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.CompletedFuture;
 
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -54,13 +56,12 @@ import static org.mockito.Mockito.mock;
 /**
  * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
  */
-public class MostlyMySqlDagManagementStateStoreTest {
+public class MySqlDagManagementStateStoreTest {
 
   private ITestMetastoreDatabase testDb;
-  private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private MySqlDagManagementStateStore dagManagementStateStore;
   private static final String TEST_USER = "testUser";
-  private static final String TEST_PASSWORD = "testPassword";
-  private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
+  public static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_TABLE = "table";
   public static String TEST_SPEC_EXECUTOR_URI = "mySpecExecutor";
 
@@ -79,6 +80,18 @@ public class MostlyMySqlDagManagementStateStoreTest {
     }
   }
 
+  public static <T> boolean compareLists(List<T> list1, List<T> list2) {
+    if (list1.size() != list2.size()) {
+      return false;
+    }
+    for (T item : list1) {
+      if (Collections.frequency(list1, item) != Collections.frequency(list2, 
item)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Test
   public void testAddDag() throws Exception {
     Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
@@ -90,14 +103,13 @@ public class MostlyMySqlDagManagementStateStoreTest {
     DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
     DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
 
-    this.dagManagementStateStore.checkpointDag(dag);
-    this.dagManagementStateStore.checkpointDag(dag2);
+    this.dagManagementStateStore.addDag(dag);
+    this.dagManagementStateStore.addDag(dag2);
     this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
     this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
     this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
 
-    Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
-    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).get().toString());
+    Assert.assertTrue(compareLists(dag.getNodes(), 
this.dagManagementStateStore.getDag(dagId).get().getNodes()));
     Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
 
     Set<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
@@ -105,26 +117,26 @@ public class MostlyMySqlDagManagementStateStoreTest {
     Assert.assertTrue(dagNodes.contains(dagNode));
     Assert.assertTrue(dagNodes.contains(dagNode2));
 
-    this.dagManagementStateStore.deleteDagNodeState(dagId, dagNode);
-    
Assert.assertFalse(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode));
-    
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId).contains(dagNode2));
-    
Assert.assertTrue(this.dagManagementStateStore.getDagNodes(dagId2).contains(dagNode3));
-
     // test to verify that adding a new dag node with the same dag node id 
(defined by the jobSpec) replaces the existing one
-    
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 1);
-    JobExecutionPlan duplicateJobExecutionPlan = new 
JobExecutionPlan(dagNode2.getValue().getJobSpec(),
-        new MockedSpecExecutor(ConfigFactory.empty()));
-    Dag.DagNode<JobExecutionPlan> duplicateDagNode = new 
Dag.DagNode<>(duplicateJobExecutionPlan);
+    JobExecutionPlan jobExecutionPlan = new 
JobExecutionPlan(dagNode2.getValue().getJobSpec(),
+        
DagTestUtils.buildNaiveTopologySpec("mySpecExecutor").getSpecExecutor());
+    jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+    // Future of type CompletedFuture is used because in tests 
InMemorySpecProducer is used and that responds with CompletedFuture
+    CompletedFuture<Boolean> future = new CompletedFuture<>(Boolean.TRUE, 
null);
+    jobExecutionPlan.setJobFuture(Optional.of(future));
+
+    Dag.DagNode<JobExecutionPlan> duplicateDagNode = new 
Dag.DagNode<>(jobExecutionPlan);
     this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId);
-    
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 1);
+    
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2);
   }
 
-  public static MostlyMySqlDagManagementStateStore 
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
+  public static MySqlDagManagementStateStore 
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
     ConfigBuilder configBuilder = ConfigBuilder.create();
-    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
+    
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreWithDagNodes.class.getName())
         .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testMetastoreDatabase.getJdbcUrl())
-        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE + 
1)
-        
.addPrimitive(MostlyMySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "dag" + 1)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
+        .addPrimitive(MySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
             + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE + 
2);
     Config config = configBuilder.build();
     JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
@@ -139,8 +151,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
     TopologySpec topologySpec = 
LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
     URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
     topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore =
-        new MostlyMySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever,
+    MySqlDagManagementStateStore dagManagementStateStore =
+        new MySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever,
             
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     return dagManagementStateStore;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 5130019536..6137f91d10 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
-import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashSet;
 
@@ -103,7 +102,7 @@ public class MysqlDagActionStoreTest {
   }
 
   @Test(dependsOnMethods = "testGetActions")
-  public void testDeleteAction() throws IOException, SQLException {
+  public void testDeleteAction() throws IOException {
      this.mysqlDagActionStore.deleteDagAction(
          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.KILL));
      Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
new file mode 100644
index 0000000000..28687771f7
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Mainly testing functionalities related to DagStateStore but not 
Mysql-related components.
+ */
+public class MysqlDagStateStoreWithDagNodesTest {
+
+  private DagStateStore dagStateStore;
+
+  private static final String TEST_USER = "testUser";
+  private static ITestMetastoreDatabase testDb;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    testDb = TestMetastoreDatabaseFactory.get();
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testDb.getJdbcUrl())
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, 
MySqlDagManagementStateStoreTest.TEST_PASSWORD);
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    this.dagStateStore = new 
MysqlDagStateStoreWithDagNodes(configBuilder.build(), topologySpecMap);
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    if (testDb != null) {
+      // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+      testDb.close();
+    }
+  }
+
+  @Test
+  public void testAddGetAndDeleteDag() throws Exception{
+    Dag<JobExecutionPlan> originalDag1 = DagTestUtils.buildDag("random_1", 
123L);
+    Dag<JobExecutionPlan> originalDag2 = DagTestUtils.buildDag("random_2", 
456L);
+    DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1);
+    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(originalDag2);
+    this.dagStateStore.writeCheckpoint(originalDag1);
+    this.dagStateStore.writeCheckpoint(originalDag2);
+
+    // Verify get one dag
+    Dag<JobExecutionPlan> dag1 = this.dagStateStore.getDag(dagId1);
+    Dag<JobExecutionPlan> dag2 = this.dagStateStore.getDag(dagId2);
+    
Assert.assertTrue(MySqlDagManagementStateStoreTest.compareLists(dag1.getNodes(),
 originalDag1.getNodes()));
+    
Assert.assertTrue(MySqlDagManagementStateStoreTest.compareLists(dag2.getNodes(),
 originalDag2.getNodes()));
+
+    // Verify dag contents
+    Dag<JobExecutionPlan> dagDeserialized = dag1;
+    Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+    Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+    Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+    Dag.DagNode<JobExecutionPlan> child = dagDeserialized.getEndNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> parent = 
dagDeserialized.getStartNodes().get(0);
+    Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+    
Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+    for (int i = 0; i < 2; i++) {
+      JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+      Config jobConfig = plan.getJobSpec().getConfig();
+      
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), 
"group" + "random_1");
+      
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), 
"flow" + "random_1");
+      
Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 
123L);
+      Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+      
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
+      
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
+    }
+
+    dagDeserialized = dag2;
+    Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+    Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+    Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+    child = dagDeserialized.getEndNodes().get(0);
+    parent = dagDeserialized.getStartNodes().get(0);
+    Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+    
Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+    for (int i = 0; i < 2; i++) {
+      JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+      Config jobConfig = plan.getJobSpec().getConfig();
+      
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), 
"group" + "random_2");
+      
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), 
"flow" + "random_2");
+      
Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 
456L);
+      Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+    }
+
+    dagStateStore.cleanUp(dagId1);
+    dagStateStore.cleanUp(dagId2);
+
+    Assert.assertNull(this.dagStateStore.getDag(dagId1));
+    Assert.assertNull(this.dagStateStore.getDag(dagId2));
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 961eddaa2b..7c5880e824 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -136,8 +136,8 @@ public class OrchestratorTest {
     this.mockDagManager = mock(DagManager.class);
     Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
 
-    MostlyMySqlDagManagementStateStore dagManagementStateStore =
-        
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    MySqlDagManagementStateStore dagManagementStateStore =
+        
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 41bdf33341..ac4cd3c49f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -17,13 +17,13 @@
 
 package org.apache.gobblin.service.modules.orchestration.proc;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -34,14 +34,16 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
 import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -77,7 +79,7 @@ public class EnforceDeadlineDagProcsTest {
     String flowGroup = "fg";
     String flowName = "fn";
     long flowExecutionId = System.currentTimeMillis();
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    MySqlDagManagementStateStore dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
@@ -88,12 +90,15 @@ public class EnforceDeadlineDagProcsTest {
             .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
             .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
         message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    dagManagementStateStore.addDag(dag);  // simulate having a dag that has 
not yet started running
     dagManagementStateStore.addDagAction(dagAction);
 
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
@@ -102,9 +107,9 @@ public class EnforceDeadlineDagProcsTest {
     enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
 
     int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
-    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
-        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
-            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+    Mockito.verify(specProducers.get(0), Mockito.times(1)).cancelJob(any(), 
any());
+    specProducers.stream().skip(expectedNumOfDeleteDagNodeStates) // 
separately verified `specProducers.get(0)`
+        .forEach(sp -> Mockito.verify(sp, Mockito.never()).cancelJob(any(), 
any()));
   }
 
   /*
@@ -117,7 +122,7 @@ public class EnforceDeadlineDagProcsTest {
     String flowGroup = "fg";
     String flowName = "fn";
     long flowExecutionId = System.currentTimeMillis();
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    MySqlDagManagementStateStore dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
@@ -128,22 +133,23 @@ public class EnforceDeadlineDagProcsTest {
             .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
             .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
         message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    dagManagementStateStore.addDag(dag);  // simulate having a dag that has 
not yet started running
 
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
         new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
             "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
     enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
 
-    int expectedNumOfDeleteDagNodeStates = 0; // no dag node because we 
simulated (by not adding) missing dag action
-    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
-        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
-            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+    // no job cancelled because we simulated (by not adding) missing dag action
+    specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).cancelJob(any(), any()));
   }
 
   /*
@@ -155,7 +161,7 @@ public class EnforceDeadlineDagProcsTest {
     String flowGroup = "fg";
     String flowName = "fn";
     long flowExecutionId = System.currentTimeMillis();
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    MySqlDagManagementStateStore dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
         DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
@@ -167,12 +173,15 @@ public class EnforceDeadlineDagProcsTest {
             .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
             .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
         message("Test 
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
is in running state
+
+    dagManagementStateStore.addDag(dag);  // simulate having a dag that is in 
running state
     dagManagementStateStore.addDagAction(dagAction);
 
     EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new 
EnforceFlowFinishDeadlineDagProc(
@@ -180,8 +189,6 @@ public class EnforceDeadlineDagProcsTest {
             "job0", 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null, 
dagManagementStateStore));
     enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
 
-    Assert.assertEquals(numOfDagNodes,
-        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
-            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+    specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.times(1)).cancelJob(any(), any()));
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index 64efb2ed56..dbe17d3362 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -45,8 +45,8 @@ import 
org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -61,13 +61,13 @@ import static org.mockito.Mockito.spy;
 
 
 public class KillDagProcTest {
-  private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private MySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testDb;
 
   @BeforeClass
   public void setUp() throws Exception {
     this.testDb = TestMetastoreDatabaseFactory.get();
-    this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
+    this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
     LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
 
@@ -86,7 +86,7 @@ public class KillDagProcTest {
         5, "user5", ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg"))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
@@ -127,7 +127,7 @@ public class KillDagProcTest {
         5, "user5", ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg"))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     JobStatus
         jobStatus = 
JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId).
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 6b5e7bbdd2..1b8c7da4ca 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -51,8 +51,8 @@ import 
org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -70,12 +70,12 @@ import static org.mockito.Mockito.spy;
 
 public class LaunchDagProcTest {
   private ITestMetastoreDatabase testMetastoreDatabase;
-  private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private MySqlDagManagementStateStore dagManagementStateStore;
 
   @BeforeClass
   public void setUp() throws Exception {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
-    this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
 
@@ -95,7 +95,7 @@ public class LaunchDagProcTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-            MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+            MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
     List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
@@ -126,7 +126,7 @@ public class LaunchDagProcTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY,  
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
     LaunchDagProc launchDagProc = new LaunchDagProc(
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index bbf5874db4..93f31d1410 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -47,7 +47,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
@@ -74,7 +74,7 @@ public class ReevaluateDagProcTest {
 
   @BeforeMethod
   public void setUp() throws Exception {
-    this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
   }
 
@@ -95,13 +95,13 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
         .flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
         
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
-    dagManagementStateStore.checkpointDag(dag);
+    dagManagementStateStore.addDag(dag);
 
     /*
     We cannot check the spec producers if addSpec is called on them because 
spec producer object changes in writing/reading
@@ -116,33 +116,33 @@ public class ReevaluateDagProcTest {
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
-
     // next job is sent to spec producer
     Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
     // there are two invocations, one after setting status and other after 
sending new job to specProducer
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(2)).addDagNodeState(any(), any());
 
-    // current job's state is deleted
-    
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
-        .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+    // assert that the first job is completed
+    Assert.assertEquals(ExecutionStatus.COMPLETE,
+        
this.dagManagementStateStore.getDag(dagId).get().getStartNodes().get(0).getValue().getExecutionStatus());
   }
 
   // test when there does not exist a next job in the dag when the current 
job's reevaluate dag action is processed
   @Test
   public void testNoNextJobToRun() throws Exception {
     String flowName = "fn2";
+    DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName, 
flowExecutionId);
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("2", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
         1, "user5", ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0")
         .flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
         
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
-    dagManagementStateStore.checkpointDag(dag);
+    dagManagementStateStore.addDag(dag);
 
     Dag<JobExecutionPlan> mockedDag = DagManagerTest.buildDag("2", 
flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
         1, "user5", ConfigFactory.empty()
@@ -150,13 +150,15 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
     // mock getDagNodeWithJobStatus() to return a dagNode with status completed
     
mockedDag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
     doReturn(new ImmutablePair<>(Optional.of(mockedDag.getNodes().get(0)), 
Optional.of(jobStatus)))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
 
+    Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));
+
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
 
     ReevaluateDagProc
@@ -167,31 +169,30 @@ public class ReevaluateDagProcTest {
     // no new job to launch for this one job flow
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
 
-    // current job's state is deleted
-    
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
-        .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
-
     // dag is deleted because the only job in the dag is completed
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
         .filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
 
     
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
+
+    Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
   }
 
   @Test
   public void testCurrentJobToRun() throws Exception {
     String flowName = "fn3";
+    DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName, 
flowExecutionId);
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
         2, "user5", ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
-    dagManagementStateStore.checkpointDag(dag);
+    dagManagementStateStore.addDag(dag);
     doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), 
Optional.empty()))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
 
@@ -208,8 +209,6 @@ public class ReevaluateDagProcTest {
     specProducers.stream().skip(numOfLaunchedJobs) // separately verified 
`specProducers.get(0)`
         .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
 
-    // no job's state is deleted because that happens when the job finishes 
triggered the reevaluate dag proc
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagNodeState(any(), any());
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
@@ -224,12 +223,12 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
         .jobName("job3").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
         
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
-    dagManagementStateStore.checkpointDag(dag);
+    dagManagementStateStore.addDag(dag);
 
     doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus)))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index c84132c8ad..e342b29e77 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.concurrent.ExecutionException;
 
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
@@ -37,8 +36,9 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -48,13 +48,13 @@ import static org.mockito.Mockito.spy;
 
 
 public class ResumeDagProcTest {
-  private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private MySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testDb;
 
   @BeforeClass
   public void setUp() throws Exception {
     testDb = TestMetastoreDatabaseFactory.get();
-    this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
+    this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
     LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
 
@@ -69,7 +69,7 @@ public class ResumeDagProcTest {
   This test creates a failed dag and launches a resume dag proc for it. It 
then verifies that the next jobs are set to run.
    */
   @Test
-  public void resumeDag() throws IOException, URISyntaxException, 
ExecutionException, InterruptedException {
+  public void resumeDag() throws IOException, URISyntaxException {
     long flowExecutionId = 12345L;
     String flowGroup = "fg";
     String flowName = "fn";
@@ -78,15 +78,16 @@ public class ResumeDagProcTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
     // simulate a failed dag in store
     
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
     
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
     
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
     
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
-    this.dagManagementStateStore.checkpointDag(dag);
+    this.dagManagementStateStore.addDag(dag);
     // simulate it as a failed dag
-    this.dagManagementStateStore.markDagFailed(dag);
+    this.dagManagementStateStore.markDagFailed(dagId);
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.RESUME),

Reply via email to