This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ac653fb75 [GOBBLIN-2074] add dag action store inside 
DagManagementStateStore (#3954)
ac653fb75 is described below

commit ac653fb75a3c78663b09fd1e947365f4324b46c8
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed May 29 14:45:30 2024 -0700

    [GOBBLIN-2074] add dag action store inside DagManagementStateStore (#3954)
    
    * remove unused fields/APIs
    * add dag action store inside DMSS
    * address review comment
---
 .../runtime/DagActionStoreChangeMonitorTest.java   | 13 ++--
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 11 ++-
 .../orchestration/DagManagementStateStore.java     | 80 ++++++++++++++++++++--
 .../orchestration/DagManagementTaskStreamImpl.java | 16 ++---
 .../modules/orchestration/FlowLaunchHandler.java   | 12 ++--
 .../MostlyMySqlDagManagementStateStore.java        | 45 ++++++++----
 .../modules/orchestration/proc/DagProcUtils.java   | 13 ++--
 .../orchestration/proc/ReevaluateDagProc.java      |  6 +-
 .../modules/orchestration/task/DagTask.java        |  8 +--
 .../task/EnforceFlowFinishDeadlineDagTask.java     |  5 +-
 .../task/EnforceJobStartDeadlineDagTask.java       |  5 +-
 .../modules/orchestration/task/KillDagTask.java    |  5 +-
 .../modules/orchestration/task/LaunchDagTask.java  |  5 +-
 .../orchestration/task/ReevaluateDagTask.java      |  5 +-
 .../modules/orchestration/task/ResumeDagTask.java  |  5 +-
 ...lowExecutionResourceHandlerWithWarmStandby.java | 12 ++--
 .../monitoring/DagActionStoreChangeMonitor.java    | 13 ++--
 .../DagActionStoreChangeMonitorFactory.java        | 10 +--
 .../DagManagementDagActionStoreChangeMonitor.java  |  5 +-
 ...nagementDagActionStoreChangeMonitorFactory.java | 10 +--
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  6 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  | 15 ++--
 .../monitoring/KafkaJobStatusMonitorFactory.java   | 14 ++--
 .../DagManagementTaskStreamImplTest.java           |  3 +-
 .../orchestration/DagProcessingEngineTest.java     | 21 +++---
 .../MostlyMySqlDagManagementStateStoreTest.java    |  3 +-
 .../modules/orchestration/OrchestratorTest.java    |  2 +-
 .../proc/EnforceDeadlineDagProcsTest.java          |  7 +-
 .../orchestration/proc/KillDagProcTest.java        |  8 +--
 .../orchestration/proc/LaunchDagProcTest.java      |  2 +-
 .../orchestration/proc/ReevaluateDagProcTest.java  | 13 ++--
 .../orchestration/proc/ResumeDagProcTest.java      |  3 +-
 32 files changed, 231 insertions(+), 150 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 54bc6ace5..9031f0031 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -41,8 +41,8 @@ import 
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
@@ -85,13 +85,13 @@ public class DagActionStoreChangeMonitorTest {
 
     public MockDagActionStoreChangeMonitor(String topic, Config config, int 
numThreads,
         boolean isMultiActiveSchedulerEnabled) {
-      this(topic, config, numThreads, isMultiActiveSchedulerEnabled, 
mock(DagActionStore.class), mock(DagManager.class), mock(FlowCatalog.class), 
mock(Orchestrator.class));
+      this(topic, config, numThreads, isMultiActiveSchedulerEnabled, 
mock(DagManagementStateStore.class), mock(DagManager.class), 
mock(FlowCatalog.class), mock(Orchestrator.class));
     }
 
     public MockDagActionStoreChangeMonitor(String topic, Config config, int 
numThreads, boolean isMultiActiveSchedulerEnabled,
-        DagActionStore dagActionStore, DagManager dagManager, FlowCatalog 
flowCatalog, Orchestrator orchestrator) {
+        DagManagementStateStore dagManagementStateStore, DagManager 
dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
       super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
-          dagActionStore, isMultiActiveSchedulerEnabled);
+          dagManagementStateStore, isMultiActiveSchedulerEnabled);
     }
 
     protected void processMessageForTest(DecodeableKafkaRecord record) {
@@ -238,8 +238,7 @@ public class DagActionStoreChangeMonitorTest {
     String jobName = "testJobName";
     String flowExecutionId = "12345677";
 
-    MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
-    mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
 
     Config monitorConfig = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
         
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
@@ -251,7 +250,7 @@ public class DagActionStoreChangeMonitorTest {
     // Throw an uncaught exception during startup sequence
     when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new 
RuntimeException("Uncaught exception"));
     mockDagActionStoreChangeMonitor =  new 
MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5,
-        true, mysqlDagActionStore, mockDagManager, mockFlowCatalog, 
mockOrchestrator);
+        true, dagManagementStateStore, mockDagManager, mockFlowCatalog, 
mockOrchestrator);
     try {
       mockDagActionStoreChangeMonitor.setActive();
     } catch (Exception e) {
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 1d1520d5d..337e6e33e 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -51,7 +51,6 @@ import com.typesafe.config.ConfigValueFactory;
 
 import kafka.consumer.ConsumerIterator;
 import kafka.message.MessageAndMetadata;
-
 import lombok.Getter;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -75,8 +74,7 @@ import 
org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -103,7 +101,7 @@ public class KafkaAvroJobStatusMonitorTest {
   private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
   private MetricContext context;
   private KafkaAvroEventKeyValueReporter.Builder<?> builder;
-  private MysqlDagActionStore mysqlDagActionStore;
+  private DagManagementStateStore dagManagementStateStore;
   private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
 
   @BeforeClass
@@ -123,9 +121,8 @@ public class KafkaAvroJobStatusMonitorTest {
     builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
     builder = 
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
         TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
-    this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
+    this.dagManagementStateStore = mock(DagManagementStateStore.class);
     this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
-    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
   }
 
   @Test
@@ -787,7 +784,7 @@ public class KafkaAvroJobStatusMonitorTest {
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads,
         AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSJobObservabilityEventProducer producer)
         throws IOException, ReflectiveOperationException {
-      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer, mysqlDagActionStore);
+      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer, dagManagementStateStore);
       shouldThrowFakeExceptionInParseJobStatus = 
shouldThrowFakeExceptionInParseJobStatusToggle;
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index b56459da7..6a6140d80 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 import java.net.URI;
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
@@ -146,12 +147,6 @@ public interface DagManagementStateStore {
    */
   List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) 
throws IOException;
 
-  /**
-   * Returns the {@link Dag} the provided {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to
-   * or Optional.absent if it is not found.
-   */
-  Optional<Dag<JobExecutionPlan>> getParentDag(Dag.DagNode<JobExecutionPlan> 
dagNode);
-
   /**
    * Deletes the dag node state that was added through {@link 
DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
    * No-op if the dag node is not found in the store.
@@ -201,4 +196,77 @@ public interface DagManagementStateStore {
    * has any running job, false otherwise.
    */
   public boolean hasRunningJobs(DagManager.DagId dagId);
+
+  /**
+   * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param jobName job name for the dag action
+   * @param dagActionType the value of the dag action
+   * @throws IOException
+   */
+  boolean existsJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException;
+
+  /**
+   * Check if an action exists in dagAction store by flow group, flow name, 
and flow execution id, it assumes jobName is
+   * empty ("").
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param dagActionType the value of the dag action
+   * @throws IOException
+   */
+  boolean existsFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId,
+      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException;
+
+  /** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore} 
for durability */
+  default void addDagAction(DagActionStore.DagAction dagAction) throws 
IOException {
+    addJobDagAction(
+        dagAction.getFlowGroup(),
+        dagAction.getFlowName(),
+        dagAction.getFlowExecutionId(),
+        dagAction.getJobName(),
+        dagAction.getDagActionType());
+  }
+
+  /**
+   * Persist the dag action in {@link DagActionStore} for durability
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param jobName job name for the dag action
+   * @param dagActionType the value of the dag action
+   * @throws IOException
+   */
+  void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+      DagActionStore.DagActionType dagActionType) throws IOException;
+
+  /**
+   * Persist the dag action in {@link DagActionStore} for durability. This 
method assumes an empty jobName.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param dagActionType the value of the dag action
+   * @throws IOException
+   */
+  default void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId,
+      DagActionStore.DagActionType dagActionType) throws IOException {
+    addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName, 
flowExecutionId, dagActionType));
+  }
+
+  /**
+   * delete the dag action from {@link DagActionStore}
+   * @param dagAction containing all information needed to identify dag and 
specific action value
+   * @throws IOException
+   * @return true if we successfully delete one record, return false if the 
record does not exist
+   */
+  boolean deleteDagAction(DagActionStore.DagAction dagAction) throws 
IOException;
+
+  /***
+   * Get all {@link DagActionStore.DagAction}s from the {@link DagActionStore}.
+   * @throws IOException Exception in retrieving {@link 
DagActionStore.DagAction}s.
+   */
+  Collection<DagActionStore.DagAction> getDagActions() throws IOException;
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 1d32cde39..0d4a229d0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -76,9 +76,6 @@ import org.apache.gobblin.util.ConfigUtils;
 public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
   private final Config config;
   @Getter private final EventSubmitter eventSubmitter;
-
-  @Inject(optional=true)
-  protected Optional<DagActionStore> dagActionStore;
   protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
   protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
   private final boolean isMultiActiveExecutionEnabled;
@@ -102,7 +99,6 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       throw new RuntimeException(String.format("DagProcessingEngine requires 
%s to be instantiated.",
           DagActionReminderScheduler.class.getSimpleName()));
     }
-    this.dagActionStore = dagActionStore;
     this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
     this.dagActionReminderScheduler = dagActionReminderScheduler;
     this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
@@ -214,17 +210,17 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
 
     switch (dagActionType) {
       case ENFORCE_FLOW_FINISH_DEADLINE:
-        return new EnforceFlowFinishDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagActionStore.get());
+        return new EnforceFlowFinishDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagManagementStateStore);
       case ENFORCE_JOB_START_DEADLINE:
-        return new EnforceJobStartDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagActionStore.get());
+        return new EnforceJobStartDeadlineDagTask(dagAction, 
leaseObtainedStatus, dagManagementStateStore);
       case KILL:
-        return new KillDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
+        return new KillDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
       case LAUNCH:
-        return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
+        return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
       case REEVALUATE:
-        return new ReevaluateDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
+        return new ReevaluateDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
       case RESUME:
-        return new ResumeDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
+        return new ResumeDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
       default:
         throw new UnsupportedOperationException(dagActionType + " not yet 
implemented");
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index bb4eda416..864fae944 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -69,7 +69,7 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 public class FlowLaunchHandler {
   private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
-  private DagActionStore dagActionStore;
+  private DagManagementStateStore dagManagementStateStore;
   private final MetricContext metricContext;
   private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
@@ -81,13 +81,13 @@ public class FlowLaunchHandler {
   @Inject
   public FlowLaunchHandler(Config config,
       @Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) 
MultiActiveLeaseArbiter leaseArbiter,
-      SchedulerService schedulerService, 
com.google.common.base.Optional<DagActionStore> optDagActionStore) {
+      SchedulerService schedulerService, 
com.google.common.base.Optional<DagManagementStateStore> 
dagManagementStateStoreOpt) {
     this.multiActiveLeaseArbiter = leaseArbiter;
 
-    if (!optDagActionStore.isPresent()) {
+    if (!dagManagementStateStoreOpt.isPresent()) {
       throw new RuntimeException("DagActionStore MUST be present for flow 
launch handling!");
     }
-    this.dagActionStore = optDagActionStore.get();
+    this.dagManagementStateStore = dagManagementStateStoreOpt.get();
 
     this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
@@ -140,8 +140,8 @@ public class FlowLaunchHandler {
   private boolean 
persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
     DagActionStore.DagAction launchDagAction = 
leaseStatus.getConsensusDagAction();
     try {
-      this.dagActionStore.addDagAction(launchDagAction);
-      DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(launchDagAction);
+      this.dagManagementStateStore.addDagAction(launchDagAction);
+      
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
launchDagAction);
       this.numFlowsSubmitted.mark();
       // after successfully persisting, close the lease
       return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 7f9498764..da50c5055 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 import java.net.URI;
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -63,11 +64,9 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Slf4j
 @Singleton
 public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateStore {
-  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new ConcurrentHashMap<>();
   private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
ConcurrentHashMap<>();
   // dagToJobs holds a map of dagId to running jobs of that dag
   private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
-  private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
   private DagStateStore dagStateStore;
   private DagStateStore failedDagStateStore;
   private JobStatusRetriever jobStatusRetriever;
@@ -80,19 +79,21 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   FlowCatalog flowCatalog;
   @Getter
   private final DagManagerMetrics dagManagerMetrics = new DagManagerMetrics();
+  private final DagActionStore dagActionStore;
 
   @Inject
   public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog, UserQuotaManager userQuotaManager,
-      JobStatusRetriever jobStatusRetriever) {
+      JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
     this.quotaManager = userQuotaManager;
     this.config = config;
     this.flowCatalog = flowCatalog;
     this.jobStatusRetriever = jobStatusRetriever;
     this.dagManagerMetrics.activate();
+    this.dagActionStore = dagActionStore;
    }
 
   // It should be called after topology spec map is set
-  private synchronized void start() throws IOException {
+  private synchronized void start() {
     if (!dagStoresInitialized) {
       this.dagStateStore = createDagStateStore(config, topologySpecMap);
       this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
@@ -180,9 +181,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   @Override
   // todo - updating different maps here and in addDagNodeState can result in 
inconsistency between the maps
   public synchronized void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
-    this.jobToDag.remove(dagNode);
     this.dagNodes.remove(dagNode.getValue().getId());
-    this.dagToDeadline.remove(dagId);
     if (this.dagToJobs.containsKey(dagId)) {
       this.dagToJobs.get(dagId).remove(dagNode);
       if (this.dagToJobs.get(dagId).isEmpty()) {
@@ -199,7 +198,6 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
     if (!dag.isPresent()) {
       throw new RuntimeException("Dag " + dagId + " not found");
     }
-    this.jobToDag.put(dagNode, dag.get());
     this.dagNodes.put(dagNode.getValue().getId(), dagNode);
     if (!this.dagToJobs.containsKey(dagId)) {
       this.dagToJobs.put(dagId, Lists.newLinkedList());
@@ -227,11 +225,6 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
     }
   }
 
-  @Override
-  public Optional<Dag<JobExecutionPlan>> 
getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) {
-    return Optional.of(this.jobToDag.get(dagNode));
-  }
-
   @Override
   public List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId 
dagId) {
     List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
@@ -275,4 +268,32 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   public boolean hasRunningJobs(DagManager.DagId dagId) {
     return !getDagNodes(dagId).isEmpty();
   }
+
+  @Override
+  public boolean existsJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException {
+    return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
+  }
+
+  @Override
+  public boolean existsFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId,
+      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException {
+    return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
dagActionType);
+  }
+
+  @Override
+  public void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+      DagActionStore.DagActionType dagActionType) throws IOException {
+    this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
+  }
+
+  @Override
+  public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws 
IOException {
+    return this.dagActionStore.deleteDagAction(dagAction);
+  }
+
+  @Override
+  public Collection<DagActionStore.DagAction> getDagActions() throws 
IOException {
+    return this.dagActionStore.getDagActions();
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 5396b0502..41615c79b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -37,7 +37,6 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -55,8 +54,6 @@ import static 
org.apache.gobblin.service.ExecutionStatus.CANCELLED;
  */
 @Slf4j
 public class DagProcUtils {
-  private static final DagActionStore dagActionStore = 
GobblinServiceManager.getClass(DagActionStore.class);
-
   /**
    * - submits a {@link JobSpec} to a {@link SpecExecutor}
    * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}
@@ -94,7 +91,7 @@ public class DagProcUtils {
       // blocks (by calling Future#get()) until the submission is completed.
       dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
 
-      sendEnforceJobStartDeadlineDagAction(dagNode);
+      sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
 
       Future<?> addSpecFuture = producer.addSpec(jobSpec);
       // todo - we should add future.get() instead of the complete future into 
the JobExecutionPlan
@@ -167,16 +164,16 @@ public class DagProcUtils {
     jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 
-  private static void 
sendEnforceJobStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)
+  private static void 
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
       throws IOException {
-    dagActionStore.addJobDagAction(dagNode.getValue().getFlowGroup(), 
dagNode.getValue().getFlowName(),
+    dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(), 
dagNode.getValue().getFlowName(),
         String.valueOf(dagNode.getValue().getFlowExecutionId()), 
dagNode.getValue().getJobName(),
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
   }
 
-  public static void 
sendEnforceFlowFinishDeadlineDagAction(DagActionStore.DagAction launchDagAction)
+  public static void 
sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, DagActionStore.DagAction launchDagAction)
       throws IOException {
-    dagActionStore.addFlowDagAction(launchDagAction.getFlowGroup(), 
launchDagAction.getFlowName(),
+    dagManagementStateStore.addFlowDagAction(launchDagAction.getFlowGroup(), 
launchDagAction.getFlowName(),
         launchDagAction.getFlowExecutionId(), 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 10529b25e..bc786b91f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -119,7 +119,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
         dagManagementStateStore.markDagFailed(dag);
       }
 
-      removeFlowFinishDeadlineTriggerAndDagAction();
+      removeFlowFinishDeadlineTriggerAndDagAction(dagManagementStateStore);
     }
   }
 
@@ -197,7 +197,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     throw new UnsupportedOperationException("More than one start job is not 
allowed");
   }
 
-  private void removeFlowFinishDeadlineTriggerAndDagAction() {
+  private void 
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore) {
     DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
         getDagNodeId().getFlowName(), 
String.valueOf(getDagNodeId().getFlowExecutionId()),
         DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
@@ -206,7 +206,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
 
     try {
       
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(getDagTask().getDagAction());
-      
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceFlowFinishDeadlineDagAction);
+      
dagManagementStateStore.deleteDagAction(enforceFlowFinishDeadlineDagAction);
     } catch (SchedulerException | IOException e) {
       log.warn("Failed to unschedule the reminder for {}", 
enforceFlowFinishDeadlineDagAction);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index f031134d1..b01860a9e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -41,13 +41,13 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 public abstract class DagTask {
   @Getter public final DagActionStore.DagAction dagAction;
   private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
-  private final DagActionStore dagActionStore;
+  private final DagManagementStateStore dagManagementStateStore;
 
   public DagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagActionStore dagActionStore) {
+      DagManagementStateStore dagManagementStateStore) {
     this.dagAction = dagAction;
     this.leaseObtainedStatus = leaseObtainedStatus;
-    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
   }
 
   public abstract <T> T host(DagTaskVisitor<T> visitor);
@@ -59,7 +59,7 @@ public abstract class DagTask {
    */
   public final boolean conclude() {
     try {
-      this.dagActionStore.deleteDagAction(this.dagAction);
+      this.dagManagementStateStore.deleteDagAction(this.dagAction);
       return this.leaseObtainedStatus.completeLease();
     } catch (IOException e) {
       // TODO: Decide appropriate exception to throw and add to the commit 
method's signature
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
index 5eaf4ffa7..bfd709fb9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration.task;
 
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
@@ -29,8 +30,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class EnforceFlowFinishDeadlineDagTask extends DagTask {
   public EnforceFlowFinishDeadlineDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagActionStore dagActionStore) {
-    super(dagAction, leaseObtainedStatus, dagActionStore);
+      DagManagementStateStore dagManagementStateStore) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
index 7ad3867a5..c1c270eb7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration.task;
 
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
@@ -29,8 +30,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class EnforceJobStartDeadlineDagTask extends DagTask {
   public EnforceJobStartDeadlineDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagActionStore dagActionStore) {
-    super(dagAction, leaseObtainedStatus, dagActionStore);
+      DagManagementStateStore dagManagementStateStore) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
index 9533a0e69..3d7315378 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration.task;
 
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
@@ -28,8 +29,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class KillDagTask extends DagTask {
   public KillDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagActionStore dagActionStore) {
-    super(dagAction, leaseObtainedStatus, dagActionStore);
+      DagManagementStateStore dagManagementStateStore) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 88a620dda..8a726b2dc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration.task;
 
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
@@ -28,8 +29,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class LaunchDagTask extends DagTask {
   public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagActionStore dagActionStore) {
-    super(dagAction, leaseObtainedStatus, dagActionStore);
+      DagManagementStateStore dagManagementStateStore) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
index 7a90c9912..ea1b36e34 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration.task;
 
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
@@ -28,8 +29,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 public class ReevaluateDagTask extends DagTask {
   public ReevaluateDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagActionStore dagActionStore) {
-    super(dagAction, leaseObtainedStatus, dagActionStore);
+      DagManagementStateStore dagManagementStateStore) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
index b9ff4fb8a..4e5ed2657 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration.task;
 
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
@@ -27,8 +28,8 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
  */
 public class ResumeDagTask extends DagTask {
   public ResumeDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
-      DagActionStore dagActionStore) {
-    super(dagAction, leaseObtainedStatus, dagActionStore);
+      DagManagementStateStore dagManagementStateStore) {
+    super(dagAction, leaseObtainedStatus, dagManagementStateStore);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index caee99e59..1b5e21083 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -40,16 +40,18 @@ import 
org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.service.FlowStatusId;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+
 
 @Slf4j
 public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends 
GobblinServiceFlowExecutionResourceHandler{
-  private DagActionStore dagActionStore;
+  private DagManagementStateStore dagManagementStateStore;
   @Inject
   public 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionResourceLocalHandler
 handler,
       @Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
-      Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) 
boolean forceLeader, DagActionStore dagActionStore) {
+      Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) 
boolean forceLeader, DagManagementStateStore dagManagementStateStore) {
     super(handler, eventBus, manager, forceLeader);
-    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
   }
 
   @Override
@@ -69,12 +71,12 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
   protected void addDagAction(String flowGroup, String flowName, Long 
flowExecutionId, DagActionStore.DagActionType actionType) {
     try {
       // If an existing resume request is still pending then do not accept 
this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), actionType)) {
+      if (this.dagManagementStateStore.existsFlowDagAction(flowGroup, 
flowName, flowExecutionId.toString(), actionType)) {
         this.throwErrorResponse("There is already a pending " + actionType + " 
action for this flow. Please wait to resubmit and wait "
             + "for action to be completed.", HttpStatus.S_409_CONFLICT);
         return;
       }
-      this.dagActionStore.addFlowDagAction(flowGroup, flowName, 
flowExecutionId.toString(), actionType);
+      this.dagManagementStateStore.addFlowDagAction(flowGroup, flowName, 
flowExecutionId.toString(), actionType);
     } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add %s action for flow %s %s %s to dag 
action store due to:", actionType, flowGroup,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 814236eeb..7f566a56b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -48,6 +48,7 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 
@@ -98,14 +99,14 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   @Getter
   @VisibleForTesting
   protected FlowCatalog flowCatalog;
-  protected DagActionStore dagActionStore;
+  protected DagManagementStateStore dagManagementStateStore;
   @Getter
   private volatile boolean isActive;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagActionStoreChangeMonitor(String topic, Config config, DagManager 
dagManager, int numThreads,
-      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, 
DagManagementStateStore dagManagementStateStore,
       boolean isMultiActiveSchedulerEnabled) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
@@ -114,7 +115,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
-    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
 
     /*
@@ -135,9 +136,9 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
    * Load all actions from the DagActionStore to process any missed actions 
during service startup
    */
   protected void initializeMonitor() {
-    Collection<DagActionStore.DagAction> dagActions = null;
+    Collection<DagActionStore.DagAction> dagActions;
     try {
-      dagActions = dagActionStore.getDagActions();
+      dagActions = dagManagementStateStore.getDagActions();
     } catch (IOException e) {
       throw new RuntimeException(String.format("Unable to retrieve dagActions 
from the dagActionStore while "
           + "initializing the %s", 
DagActionStoreChangeMonitor.class.getCanonicalName()), e);
@@ -326,7 +327,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     } finally {
       // Delete the dag action regardless of whether it was processed 
successfully to avoid accumulating failure cases
       try {
-        this.dagActionStore.deleteDagAction(dagAction);
+        this.dagManagementStateStore.deleteDagAction(dagAction);
       } catch (IOException e) {
         log.warn("Failed to delete dag action from dagActionStore. dagAction: 
{}", dagAction);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 767f2b76d..5bac15089 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -28,7 +28,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.util.InjectionNames;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
@@ -45,18 +45,18 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
   private DagManager dagManager;
   private FlowCatalog flowCatalog;
   private Orchestrator orchestrator;
-  private DagActionStore dagActionStore;
+  private DagManagementStateStore dagManagementStateStore;
   private boolean isMultiActiveSchedulerEnabled;
 
   @Inject
   public DagActionStoreChangeMonitorFactory(Config config, DagManager 
dagManager, FlowCatalog flowCatalog,
-      Orchestrator orchestrator, DagActionStore dagActionStore,
+      Orchestrator orchestrator, DagManagementStateStore 
dagManagementStateStore,
       @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
     this.config = Objects.requireNonNull(config);
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
-    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
   }
 
@@ -68,7 +68,7 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
     return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagManager, numThreads, flowCatalog,
-        orchestrator, dagActionStore, isMultiActiveSchedulerEnabled);
+        orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index eacecab94..ab863974c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 
 
@@ -40,11 +41,11 @@ public class DagManagementDagActionStoreChangeMonitor 
extends DagActionStoreChan
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
-      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, 
DagManagementStateStore dagManagementStateStore,
       boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
     // DagManager is only needed in the `handleDagAction` method of its parent 
class and not needed in this class,
     // so we are passing a null value for DagManager to its parent class.
-    super("", config, null, numThreads, flowCatalog, orchestrator, 
dagActionStore, isMultiActiveSchedulerEnabled);
+    super("", config, null, numThreads, flowCatalog, orchestrator, 
dagManagementStateStore, isMultiActiveSchedulerEnabled);
     this.dagManagement = dagManagement;
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 53b1f6836..c5828575d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -28,8 +28,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.util.InjectionNames;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
@@ -45,18 +45,18 @@ public class 
DagManagementDagActionStoreChangeMonitorFactory implements Provider
   private final Config config;
   private final FlowCatalog flowCatalog;
   private final Orchestrator orchestrator;
-  private final DagActionStore dagActionStore;
+  private final DagManagementStateStore dagManagementStateStore;
   private final boolean isMultiActiveSchedulerEnabled;
   private final DagManagement dagManagement;
 
   @Inject
   public DagManagementDagActionStoreChangeMonitorFactory(Config config, 
DagManager dagManager, FlowCatalog flowCatalog,
-      Orchestrator orchestrator, DagActionStore dagActionStore, DagManagement 
dagManagement,
+      Orchestrator orchestrator, DagManagementStateStore 
dagManagementStateStore, DagManagement dagManagement,
       @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
     this.config = Objects.requireNonNull(config);
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
-    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
     this.dagManagement = dagManagement;
   }
@@ -68,7 +68,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory 
implements Provider
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
     return new 
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
-        numThreads, flowCatalog, orchestrator, dagActionStore, 
isMultiActiveSchedulerEnabled, this.dagManagement);
+        numThreads, flowCatalog, orchestrator, dagManagementStateStore, 
isMultiActiveSchedulerEnabled, this.dagManagement);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index eb405ba60..74a864da3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -46,7 +46,7 @@ import 
org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -66,9 +66,9 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
   private Meter messageParseFailures;
 
   public KafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads, JobIssueEventHandler jobIssueEventHandler,
-      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
+      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagManagementStateStore dagManagementStateStore)
       throws IOException, ReflectiveOperationException {
-    super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer, dagActionStore);
+    super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer, dagManagementStateStore);
 
     if (ConfigUtils.getBoolean(config, 
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
       KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new 
KafkaAvroSchemaRegistryFactory().
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9d707ffdc..d6ff2b651 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -67,6 +67,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.retry.RetryerFactory;
@@ -119,11 +120,11 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   private final JobIssueEventHandler jobIssueEventHandler;
   private final Retryer<Void> persistJobStatusRetryer;
   private final GaaSJobObservabilityEventProducer eventProducer;
-  private final DagActionStore dagActionStore;
+  private final DagManagementStateStore dagManagementStateStore;
   private final boolean dagProcEngineEnabled;
 
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler,
-      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
+      GaaSJobObservabilityEventProducer observabilityEventProducer, 
DagManagementStateStore dagManagementStateStore)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
     String stateStoreFactoryClass = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, 
FileContextBasedFsStateStoreFactory.class.getName());
@@ -133,7 +134,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
     this.jobIssueEventHandler = jobIssueEventHandler;
-    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
     this.dagProcEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
 
     Config retryerOverridesConfig = 
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
@@ -227,10 +228,10 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
             this.eventProducer.emitObservabilityEvent(jobStatus);
             if (this.dagProcEngineEnabled) {
               // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
-              this.dagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+              this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
             }
           } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
-            removeStartDeadlineTriggerAndDagAction(flowGroup, flowName, 
flowExecutionId, jobName);
+            removeStartDeadlineTriggerAndDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
           }
 
           // update the state store after adding a dag action to guaranty 
at-least-once adding of dag action
@@ -256,7 +257,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     }
   }
 
-  private void removeStartDeadlineTriggerAndDagAction(String flowGroup, String 
flowName, String flowExecutionId, String jobName) {
+  private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore, String flowGroup, String flowName, String 
flowExecutionId, String jobName) {
     DagActionStore.DagAction enforceStartDeadlineDagAction = new 
DagActionStore.DagAction(flowGroup, flowName,
         String.valueOf(flowExecutionId), jobName, 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
     log.info("Deleting reminder trigger and dag action {}", 
enforceStartDeadlineDagAction);
@@ -264,7 +265,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
     try {
       
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
-      
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceStartDeadlineDagAction);
+      dagManagementStateStore.deleteDagAction(enforceStartDeadlineDagAction);
     } catch (SchedulerException | IOException e) {
       log.error("Failed to unschedule the reminder for {}", 
enforceStartDeadlineDagAction);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index a1698e57a..756e04f73 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -35,7 +35,7 @@ import 
org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.runtime.util.InjectionNames;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -52,22 +52,22 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
   private final JobIssueEventHandler jobIssueEventHandler;
   private final MultiContextIssueRepository issueRepository;
   private final boolean instrumentationEnabled;
-  private final DagActionStore dagActionStore;
+  private final DagManagementStateStore dagManagementStateStore;
   private final boolean dagProcEngineEnabled;
 
   @Inject
   public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
-      GobblinInstanceEnvironment env, DagActionStore dagActionStore, 
@Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean dagProcEngineEnabled) {
-    this(config, jobIssueEventHandler, issueRepository, 
env.isInstrumentationEnabled(), dagActionStore, dagProcEngineEnabled);
+      GobblinInstanceEnvironment env, DagManagementStateStore 
dagManagementStateStore, @Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean 
dagProcEngineEnabled) {
+    this(config, jobIssueEventHandler, issueRepository, 
env.isInstrumentationEnabled(), dagManagementStateStore, dagProcEngineEnabled);
   }
 
   public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
-      boolean instrumentationEnabled, DagActionStore dagActionStore, boolean 
dagProcEngineEnabled) {
+      boolean instrumentationEnabled, DagManagementStateStore 
dagManagementStateStore, boolean dagProcEngineEnabled) {
     this.config = Objects.requireNonNull(config);
     this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
     this.issueRepository = issueRepository;
     this.instrumentationEnabled = instrumentationEnabled;
-    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
     this.dagProcEngineEnabled = dagProcEngineEnabled;
   }
 
@@ -103,7 +103,7 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
         observabilityEventProducerClassName, 
ConfigUtils.configToState(config), this.issueRepository, 
this.instrumentationEnabled);
 
     return (KafkaJobStatusMonitor) GobblinConstructorUtils
-        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer, 
dagActionStore);
+        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer, 
dagManagementStateStore);
   }
 
   @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 070d6a85b..8d4ff795a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -74,7 +74,8 @@ public class DagManagementTaskStreamImplTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, null);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config,
+        null, null, null, mock(DagActionStore.class));
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index d1f1e736d..92345122b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -58,16 +58,15 @@ public class DagProcessingEngineTest {
   private DagManagementTaskStreamImpl dagManagementTaskStream;
   private DagTaskStream dagTaskStream;
   private DagProcFactory dagProcFactory;
-  private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private static MostlyMySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testMetastoreDatabase;
-  static DagActionStore dagActionStore;
   static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
 
   @BeforeClass
   public void setUp() throws Exception {
     // Setting up mock DB
     testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
-    dagActionStore = mock(DagActionStore.class);
+    DagActionStore dagActionStore = mock(DagActionStore.class);
     doReturn(true).when(dagActionStore).deleteDagAction(any());
     leaseObtainedStatus = mock(LeaseAttemptStatus.LeaseObtainedStatus.class);
     doReturn(true).when(leaseObtainedStatus).completeLease();
@@ -87,16 +86,18 @@ public class DagProcessingEngineTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, null);
-    this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
-    this.dagManagementTaskStream =
+    dagManagementStateStore = new MostlyMySqlDagManagementStateStore(config, 
null,
+        null, null, dagActionStore);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    doReturn(true).when(dagActionStore).deleteDagAction(any());
+    dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
             mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)), false,
-            this.dagManagementStateStore);
+            dagManagementStateStore);
     this.dagProcFactory = new DagProcFactory(null);
 
     DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
-        new 
DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream, 
this.dagProcFactory,
+        new DagProcessingEngine.DagProcEngineThread(dagManagementTaskStream, 
this.dagProcFactory,
             dagManagementStateStore, 0);
     this.dagTaskStream = spy(new MockedDagTaskStream());
     DagProcessingEngine dagProcessingEngine =
@@ -139,7 +140,7 @@ public class DagProcessingEngineTest {
     private final boolean isBad;
 
     public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
-      super(dagAction, leaseObtainedStatus, dagActionStore);
+      super(dagAction, leaseObtainedStatus, dagManagementStateStore);
       this.isBad = isBad;
     }
 
@@ -191,6 +192,6 @@ public class DagProcessingEngineTest {
             + "Actual number of invocations " + 
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
         log, 1, 1000L);
 
-    
Assert.assertEquals(this.dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
  expectedExceptions);
+    
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
  expectedExceptions);
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 6d6f8b431..970309cef 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -99,7 +99,6 @@ public class MostlyMySqlDagManagementStateStoreTest {
     Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
     Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).get().toString());
     Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
-    Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
 
     List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
     Assert.assertEquals(2, dagNodes.size());
@@ -138,7 +137,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
     MostlyMySqlDagManagementStateStore dagManagementStateStore =
-        new MostlyMySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever);
+        new MostlyMySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever, mock(DagActionStore.class));
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     return dagManagementStateStore;
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 87ce00368..86d6ff104 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -125,7 +125,7 @@ public class OrchestratorTest {
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE).build();
 
     MostlyMySqlDagManagementStateStore dagManagementStateStore =
-        new MostlyMySqlDagManagementStateStore(config, null, null, null);
+        new MostlyMySqlDagManagementStateStore(config, null, null, null, 
mock(DagActionStore.class));
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 4f8f7b365..52d80469f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.service.monitoring.JobStatus;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 
@@ -92,12 +91,11 @@ public class EnforceDeadlineDagProcsTest {
         message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
     dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
 
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
         new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
-            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
     enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
 
     int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
@@ -129,12 +127,11 @@ public class EnforceDeadlineDagProcsTest {
         message("Test 
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
     dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
is in running state
 
     EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new 
EnforceFlowFinishDeadlineDagProc(
         new EnforceFlowFinishDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
-            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
     enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
 
     Assert.assertEquals(numOfDagNodes,
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index c283b49d5..b7c86eebe 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -94,7 +94,7 @@ public class KillDagProcTest {
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow1",
         String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
-        null, mock(DagActionStore.class)), flowCompilationValidationHelper);
+        null, this.dagManagementStateStore), flowCompilationValidationHelper);
     launchDagProc.process(this.dagManagementStateStore);
 
     List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -107,7 +107,7 @@ public class KillDagProcTest {
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow1",
        String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.KILL),
-        null, mock(DagActionStore.class)));
+        null, this.dagManagementStateStore));
     killDagProc.process(this.dagManagementStateStore);
 
     long cancelJobCount = specProducers.stream()
@@ -137,7 +137,7 @@ public class KillDagProcTest {
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow2",
         String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
-        null, mock(DagActionStore.class)), flowCompilationValidationHelper);
+        null, this.dagManagementStateStore), flowCompilationValidationHelper);
     launchDagProc.process(this.dagManagementStateStore);
 
     List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -150,7 +150,7 @@ public class KillDagProcTest {
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow2",
         String.valueOf(flowExecutionId), "job2", 
DagActionStore.DagActionType.KILL),
-        null, mock(DagActionStore.class)));
+        null, this.dagManagementStateStore));
     killDagProc.process(this.dagManagementStateStore);
 
     long cancelJobCount = specProducers.stream()
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 48c65870e..34a323914 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -89,7 +89,7 @@ public class LaunchDagProcTest {
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
     LaunchDagProc launchDagProc = new LaunchDagProc(
         new LaunchDagTask(new DagActionStore.DagAction("fg", "fn", "12345",
-            "jn", DagActionStore.DagActionType.LAUNCH), null, 
mock(DagActionStore.class)),
+            "jn", DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore),
         flowCompilationValidationHelper);
 
     launchDagProc.process(this.dagManagementStateStore);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 6a625e527..eb3429597 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -70,7 +70,6 @@ public class ReevaluateDagProcTest {
   private ITestMetastoreDatabase testMetastoreDatabase;
   private DagManagementStateStore dagManagementStateStore;
   private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
-  private DagActionStore dagActionStore;
   private DagActionReminderScheduler dagActionReminderScheduler;
 
   @BeforeClass
@@ -83,9 +82,7 @@ public class ReevaluateDagProcTest {
   public void setUp() throws Exception {
     this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     mockDMSSCommonBehavior(dagManagementStateStore);
-    this.dagActionStore = mock(DagActionStore.class);
     this.dagActionReminderScheduler = mock(DagActionReminderScheduler.class);
-    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(this.dagActionStore);
     this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(this.dagActionReminderScheduler);
   }
 
@@ -125,11 +122,10 @@ public class ReevaluateDagProcTest {
 
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
 
     long addSpecCount = specProducers.stream()
@@ -151,7 +147,7 @@ public class ReevaluateDagProcTest {
         .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
 
     // when there is no more job to run in re-evaluate dag proc, it deletes 
enforce_flow_finish_dag_action also
-    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
 
@@ -170,7 +166,6 @@ public class ReevaluateDagProcTest {
 
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
-    
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
     doReturn(true).when(dagManagementStateStore).releaseQuota(any());
 
     List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -191,7 +186,7 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
 
     // no new job to launch for this one job flow
@@ -208,7 +203,7 @@ public class ReevaluateDagProcTest {
     
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
 
-    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 993fba8c0..08fff6fe1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -53,7 +53,6 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 
@@ -98,7 +97,7 @@ public class ResumeDagProcTest {
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.RESUME),
-        null, mock(DagActionStore.class)));
+        null, this.dagManagementStateStore));
     resumeDagProc.process(this.dagManagementStateStore);
 
     SpecProducer<Spec> specProducer = 
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));

Reply via email to