This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e2655cf32 [GOBBLIN-2019] Remove circular dependency & initialize 
multi-active execution related classes (#3899)
e2655cf32 is described below

commit e2655cf326eac1f5401e3b65c703efcd68621974
Author: umustafi <[email protected]>
AuthorDate: Fri Mar 29 13:15:16 2024 -0700

    [GOBBLIN-2019] Remove circular dependency & initialize multi-active 
execution related classes (#3899)
    
    * Remove circular dependency & initialize executon related classes
    * Remove default table names and fix compile bug
    * Add test for getClass method
    * Improve comments & javadocs
    * Fix MysqlMultiActiveLeaseArbiterTest
    * Create abstract factory class and update DagProc fields
    * Refactor dagProc methods and use base class for mock in tests
    * Use lease arbiter in single-active execution mode
    * Delete dagAction before concluding lease
    * Rename lease arbiter
    * Simplify gobblinServiceManagerTest
    * Update gobblinServiceManager create method
---
 .../gobblin/configuration/ConfigurationKeys.java   |  12 +-
 .../src/main/avro/DagActionStoreChangeEvent.avsc   |   5 +
 .../runtime/DagActionStoreChangeMonitorTest.java   |   3 +-
 .../modules/core/GobblinServiceGuiceModule.java    |  37 ++--
 .../modules/core/GobblinServiceManager.java        |  34 ++--
 ...nProcessingMultiActiveLeaseArbiterFactory.java} |  26 +--
 .../orchestration/DagActionReminderScheduler.java  | 103 +++++++++--
 .../modules/orchestration/DagActionStore.java      |   5 +-
 .../orchestration/DagManagementTaskStreamImpl.java | 106 ++++++++---
 .../service/modules/orchestration/DagManager.java  |   6 +-
 .../modules/orchestration/FlowLaunchHandler.java   |   6 +-
 ... FlowLaunchMultiActiveLeaseArbiterFactory.java} |  26 +--
 .../MostlyMySqlDagManagementStateStore.java        |   2 +
 .../MultiActiveLeaseArbiterFactory.java            |  57 ++++++
 .../modules/orchestration/MysqlDagActionStore.java |   7 +-
 .../MysqlMultiActiveLeaseArbiter.java              |  19 +-
 .../modules/orchestration/Orchestrator.java        |   2 +-
 .../ReminderSettingDagProcLeaseArbiter.java        | 197 ---------------------
 .../modules/orchestration/proc/DagProc.java        |  14 +-
 .../modules/orchestration/proc/LaunchDagProc.java  |  18 +-
 .../modules/orchestration/task/DagTask.java        |  12 +-
 .../modules/orchestration/task/LaunchDagTask.java  |   5 +-
 ...lowExecutionResourceHandlerWithWarmStandby.java |   2 +-
 .../monitoring/DagActionStoreChangeMonitor.java    |   7 +-
 .../gobblin/service/GobblinServiceManagerTest.java |  14 ++
 .../DagActionReminderSchedulerTest.java            |  73 ++++++++
 .../DagManagementTaskStreamImplTest.java           |  12 +-
 .../modules/orchestration/DagManagerFlowTest.java  |  14 +-
 .../orchestration/DagProcessingEngineTest.java     |  14 +-
 .../orchestration/MysqlDagActionStoreTest.java     |   8 +-
 .../MysqlMultiActiveLeaseArbiterTest.java          |   8 +-
 .../orchestration/proc/LaunchDagProcTest.java      |   6 +-
 32 files changed, 497 insertions(+), 363 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d7204df8f..6f4b100b1 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -100,13 +100,13 @@ public class ConfigurationKeys {
   public static final String 
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = 
MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
   public static final long 
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 
60; // (3 days in seconds)
   // Scheduler lease determination store configuration
+  public static final String SCHEDULER_LEASE_ARBITER_NAME = 
"SchedulerFlowLaunchLeaseArbiter";
+  public static final String PROCESSING_LEASE_ARBITER_NAME = 
"DagActionProcessingLeaseArbiter";
   public static final String MYSQL_LEASE_ARBITER_PREFIX = 
"MysqlMultiActiveLeaseArbiter";
-  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
-  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "gobblin_multi_active_scheduler_constants_store";
-  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
-  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
-  public static final String 
SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
-  public static final long 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 * 
60 * 60 * 1000; // (3 days in ms)
+  public static final String MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
+  public static final String LEASE_DETERMINATION_STORE_DB_TABLE_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + "." + STATE_STORE_DB_TABLE_KEY;
+  public static final String 
LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
+  public static final long 
DEFAULT_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 * 60 * 60 * 
1000; // (3 days in ms)
   // Refers to the event we originally tried to acquire a lease which achieved 
`consensus` among participants through
   // the database
   public static final String 
SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY = 
"preservedConsensusEventTimeMillis";
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index b628f1714..60eeb154a 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -23,6 +23,11 @@
     "type" : "string",
     "doc" : "flow execution id for the dag action",
     "compliance" : "NONE"
+  }, {
+    "name" : "jobName",
+    "type" : "string",
+    "doc" : "job name for the dag action (empty string if not applicable)",
+    "compliance" : "NONE"
   }, {
     "name" : "dagAction",
     "type": {
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 71c97f0e5..78083c065 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -206,7 +206,8 @@ public class DagActionStoreChangeMonitorTest {
     GenericStoreChangeEvent genericStoreChangeEvent =
         new GenericStoreChangeEvent(key, String.valueOf(txidCounter), 
System.currentTimeMillis(), operationType);
     txidCounter++;
-    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, flowExecutionId, dagAction);
+    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, flowExecutionId,
+        DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index c01ff3e62..ed179fa58 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -37,11 +37,13 @@ import com.typesafe.config.Config;
 
 import javax.inject.Singleton;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionProcessingMultiActiveLeaseArbiterFactory;
+import 
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
-import 
org.apache.gobblin.service.modules.orchestration.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -77,7 +79,6 @@ import 
org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
 import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
-import 
org.apache.gobblin.service.modules.orchestration.ReminderSettingDagProcLeaseArbiter;
 import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -108,8 +109,6 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 public class GobblinServiceGuiceModule implements Module {
-  public static final String SCHEDULER_LEASE_ARBITER_NAME = 
"SchedulerLeaseArbiter";
-  public static final String EXECUTOR_LEASE_ARBITER_NAME = 
"ExecutorLeaseArbiter";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinServiceGuiceModule.class);
   private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = 
"jobStatusRetriever.class";
 
@@ -190,7 +189,9 @@ public class GobblinServiceGuiceModule implements Module {
     OptionalBinder.newOptionalBinder(binder, MultiActiveLeaseArbiter.class);
     OptionalBinder.newOptionalBinder(binder, FlowLaunchHandler.class);
     if (serviceConfig.isMultiActiveSchedulerEnabled()) {
-      
binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(SCHEDULER_LEASE_ARBITER_NAME)).to(MysqlMultiActiveLeaseArbiter.class);
+      binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(
+          ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)).toProvider(
+          FlowLaunchMultiActiveLeaseArbiterFactory.class);
       binder.bind(FlowLaunchHandler.class);
     }
 
@@ -202,21 +203,25 @@ public class GobblinServiceGuiceModule implements Module {
     OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class);
     OptionalBinder.newOptionalBinder(binder, DagProcFactory.class);
     OptionalBinder.newOptionalBinder(binder, DagProcessingEngine.class);
+    OptionalBinder.newOptionalBinder(binder, DagActionReminderScheduler.class);
     if (serviceConfig.isDagProcessingEngineEnabled()) {
-      binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
-      binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
-      
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class).in(Singleton.class);
-      binder.bind(DagProcFactory.class);
-      binder.bind(DagProcessingEngine.class);
-
+      /* Note that two instances of the same class can only be differentiated 
with an `annotatedWith` marker provided at
+      binding time (optionally bound classes cannot have names associated with 
them). Unlike, the scheduler lease arbiter,
+      the execution lease arbiter is used in single-active or multi-active 
execution. */
+      binder.bind(MultiActiveLeaseArbiter.class).
+              
annotatedWith(Names.named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME))
+          .toProvider(
+              DagActionProcessingMultiActiveLeaseArbiterFactory.class);
       // Multi-active execution is only compatible with dagProcessingEngine 
configuration
-      OptionalBinder.newOptionalBinder(binder, 
ReminderSettingDagProcLeaseArbiter.class);
-      OptionalBinder.newOptionalBinder(binder, 
DagActionReminderScheduler.class);
       if (serviceConfig.isMultiActiveExecutionEnabled()) {
-        
binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(EXECUTOR_LEASE_ARBITER_NAME)).to(MysqlMultiActiveLeaseArbiter.class);
         binder.bind(DagActionReminderScheduler.class);
-        binder.bind(ReminderSettingDagProcLeaseArbiter.class);
       }
+
+      binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
+      binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
+      
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class);
+      binder.bind(DagProcFactory.class);
+      binder.bind(DagProcessingEngine.class);
     }
 
     binder.bind(FlowConfigsResource.class);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index e65d65f38..a5e8f102e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -116,6 +116,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   public static final String SERVICE_EVENT_BUS_NAME = 
"GobblinServiceManagerEventBus";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinServiceManager.class);
+  private static volatile GobblinServiceGuiceModule 
GOBBLIN_SERVICE_GUICE_MODULE;
 
   protected final ServiceBasedAppLauncher serviceLauncher;
   private volatile boolean stopInProgress = false;
@@ -250,11 +251,28 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     return create(new GobblinServiceConfiguration(serviceName, serviceId, 
config, serviceWorkDir));
   }
 
+  /**
+   * Uses the provided serviceConfiguration to create a new Guice module and 
obtain a new class associated with it.
+   * This method should only be called once per application.
+   */
   public static GobblinServiceManager create(GobblinServiceConfiguration 
serviceConfiguration) {
-    GobblinServiceGuiceModule guiceModule = new 
GobblinServiceGuiceModule(serviceConfiguration);
+    GOBBLIN_SERVICE_GUICE_MODULE = new 
GobblinServiceGuiceModule(serviceConfiguration);
+    return getClass(GobblinServiceManager.class);
+  }
 
-    Injector injector = Guice.createInjector(Stage.PRODUCTION, guiceModule);
-    return injector.getInstance(GobblinServiceManager.class);
+  /**
+   *
+   * @param classToGet
+   * @return a new object if the class type is not marked with @Singleton, 
otherwise the same instance of the class
+   * @param <T>
+   */
+  public static <T> T getClass(Class<T> classToGet) {
+    if (GOBBLIN_SERVICE_GUICE_MODULE == null) {
+      throw new RuntimeException(String.format("getClass called to obtain %s 
without calling create method to "
+          + "initialize GobblinServiceGuiceModule.", classToGet));
+    }
+    Injector injector = Guice.createInjector(Stage.PRODUCTION, 
GOBBLIN_SERVICE_GUICE_MODULE);
+    return injector.getInstance(classToGet);
   }
 
   public URI getRestLiServerListeningURI() {
@@ -633,14 +651,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
       Config config = ConfigFactory.load();
 
-      GobblinServiceConfiguration serviceConfiguration =
-          new 
GobblinServiceConfiguration(cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), 
getServiceId(cmd), config,
-              null);
-
-      GobblinServiceGuiceModule guiceModule = new 
GobblinServiceGuiceModule(serviceConfiguration);
-      Injector injector = Guice.createInjector(guiceModule);
-
-      try (GobblinServiceManager gobblinServiceManager = 
injector.getInstance(GobblinServiceManager.class)) {
+      try (GobblinServiceManager gobblinServiceManager =
+          create(cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), 
getServiceId(cmd), config, null)) {
         gobblinServiceManager.start();
 
         if (isTestMode) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionProcessingMultiActiveLeaseArbiterFactory.java
similarity index 55%
copy from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
copy to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionProcessingMultiActiveLeaseArbiterFactory.java
index 98ede576b..2ca52b137 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionProcessingMultiActiveLeaseArbiterFactory.java
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.orchestration.task;
+package org.apache.gobblin.service.modules.orchestration;
 
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
-import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
 
 
 /**
- * A {@link DagTask} responsible to handle launch tasks.
+ * A factory implementation that returns a {@link MultiActiveLeaseArbiter} 
instance used by the
+ * {@link DagManagementTaskStreamImpl} in multi-active execution mode
  */
+@Slf4j
+public class DagActionProcessingMultiActiveLeaseArbiterFactory extends 
MultiActiveLeaseArbiterFactory {
 
-public class LaunchDagTask extends DagTask {
-  public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
-    super(dagAction, leaseObtainedStatus);
-  }
-
-  public <T> T host(DagTaskVisitor<T> visitor) {
-    return visitor.meet(this);
+  @Inject
+  public DagActionProcessingMultiActiveLeaseArbiterFactory(Config config) {
+    super(config, ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index ddcd1fa79..9334f796b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -17,15 +17,26 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import java.util.Optional;
+import java.io.IOException;
+import java.util.Date;
+import java.util.function.Supplier;
 
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
 import org.quartz.impl.StdSchedulerFactory;
 
 import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 
 
 /**
@@ -36,14 +47,12 @@ import javax.inject.Inject;
 public class DagActionReminderScheduler {
   public static final String DAG_ACTION_REMINDER_SCHEDULER_KEY = 
"DagActionReminderScheduler";
   private final Scheduler quartzScheduler;
-  private final Optional<DagManagement> dagManagement;
 
   @Inject
-  public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory, 
Optional<DagManagement> dagManagement)
+  public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
       throws SchedulerException {
     // Create a new Scheduler to be used solely for the DagProc reminders
     this.quartzScheduler = 
schedulerFactory.getScheduler(DAG_ACTION_REMINDER_SCHEDULER_KEY);
-    this.dagManagement = dagManagement;
   }
 
   /**
@@ -55,20 +64,88 @@ public class DagActionReminderScheduler {
    */
   public void scheduleReminder(DagActionStore.DagAction dagAction, long 
reminderDurationMillis)
       throws SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
-    Trigger trigger = 
ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction, 
reminderDurationMillis);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis, System::currentTimeMillis);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
   public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws 
SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
     quartzScheduler.deleteJob(jobDetail.getKey());
   }
 
+  /**
+   * Static class used to store information regarding a pending dagAction that 
needs to be revisited at a later time
+   * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
+   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
+   */
+  @Slf4j
+  public static class ReminderJob implements Job {
+    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+
+    @Override
+    public void execute(JobExecutionContext context) {
+      // Get properties from the trigger to create a dagAction
+      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(
+          jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+
+      log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
+          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
+          dagActionType);
+
+      try {
+        DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
+        dagManagement.addDagAction(dagAction);
+      } catch (IOException e) {
+        log.error("Failed to add DagAction to DagManagement. Action: {}", 
dagAction);
+      }
+    }
+  }
+
+  /**
+   * Creates a key for the reminder job by concatenating all dagAction fields
+   */
+  public static String createDagActionReminderKey(DagActionStore.DagAction 
dagAction) {
+    return String.format("%s.%s.%s.%s.%s", dagAction.getFlowGroup(), 
dagAction.getFlowName(),
+        dagAction.getFlowExecutionId(), dagAction.getJobName(), 
dagAction.getDagActionType());
+  }
+
+  /**
+   * Creates a jobDetail containing flow and job identifying information in 
the jobDataMap, uniquely identified
+   *  by a key comprised of the dagAction's fields.
+   */
+  public static JobDetail createReminderJobDetail(DagActionStore.DagAction 
dagAction) {
+    JobDataMap dataMap = new JobDataMap();
+    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
+    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
+    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
+    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagAction.getDagActionType());
+
+    return JobBuilder.newJob(ReminderJob.class)
+        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowGroup())
+        .usingJobData(dataMap)
+        .build();
+  }
+
+  /**
+   * Creates a Trigger object with the same key as the ReminderJob (since only 
one trigger is expected to be associated
+   * with a job at any given time) that should fire after 
`reminderDurationMillis` millis. It uses
+   * `getCurrentTimeMillis` to determine the current time.
+   */
+  public static Trigger createReminderJobTrigger(DagActionStore.DagAction 
dagAction, long reminderDurationMillis,
+      Supplier<Long> getCurrentTimeMillis) {
+    Trigger trigger = TriggerBuilder.newTrigger()
+        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowGroup())
+        .startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
+        .build();
+    return trigger;
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 3076486f7..94beebd16 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -28,6 +28,7 @@ import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 
 
 public interface DagActionStore {
+  public static final String NO_JOB_NAME_DEFAULT = "";
   enum DagActionType {
     KILL, // Kill invoked through API call
     RESUME, // Resume flow invoked through API call
@@ -98,7 +99,7 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  void addDagAction(String flowGroup, String flowName, String flowExecutionId, 
String jobName, DagActionType dagActionType) throws IOException;
+  void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType) throws 
IOException;
 
   /**
    * Persist the dag action in {@link DagActionStore} for durability. This 
method assumes an empty jobName.
@@ -108,7 +109,7 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  void addDagAction(String flowGroup, String flowName, String flowExecutionId, 
DagActionType dagActionType) throws IOException;
+  void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) throws IOException;
 
   /**
    * delete the dag action from {@link DagActionStore}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index a6778c82f..16d396394 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -17,22 +17,28 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.quartz.SchedulerException;
+
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.inject.Named;
 import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.util.ConfigUtils;
@@ -40,7 +46,20 @@ import org.apache.gobblin.util.ConfigUtils;
 
 /**
  * DagManagementTaskStreamImpl implements {@link DagManagement} and {@link 
DagTaskStream}. It accepts
- * {@link DagActionStore.DagAction}s and iteratively provides {@link DagTask}.
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction}s and 
iteratively provides
+ * {@link DagTask}.
+ *
+ * It uses {@link MultiActiveLeaseArbiter} to coordinate multiple hosts with 
execution components enabled in
+ * multi-active execution mode to respond to flow action events by attempting 
ownership over a flow action event at a
+ * given event time. Only events that the current instance acquires a lease 
for are selected by
+ * {@link DagManagementTaskStreamImpl#next()}. If the status of the lease 
ownership attempt is anything other than an
+ * indication the lease has been completed
+ * ({@link LeaseAttemptStatus}) then the {@link 
MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder for
+ * the flow action using {@link DagActionReminderScheduler} to reattempt the 
lease after the current leaseholder's grant
+ * would have expired.
+ * Note that if multi-active execution is NOT enabled, then all flow action 
events are selected by
+ * {@link DagManagementTaskStreamImpl#next()} by virtue of having no other 
contenders for the lease at the time
+ * {@link MultiActiveLeaseArbiter#tryAcquireLease} is called.
  */
 @Slf4j
 @Singleton
@@ -51,20 +70,34 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
 
   @Inject(optional=true)
   protected Optional<DagActionStore> dagActionStore;
-  protected Optional<ReminderSettingDagProcLeaseArbiter> 
reminderSettingDagProcLeaseArbiter;
+  protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
+  protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
+  private final boolean isMultiActiveExecutionEnabled;
   @Inject
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
   private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
 
-  // TODO: need to pass reference to DagProcLeaseArbiter without creating a 
circular reference in Guice
   @Inject
   public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore,
-      Optional<ReminderSettingDagProcLeaseArbiter> 
reminderSettingDagProcLeaseArbiter) {
+      @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME) 
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
+      Optional<DagActionReminderScheduler> dagActionReminderScheduler,
+      @Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean 
isMultiActiveExecutionEnabled) {
     this.config = config;
+    if (!dagActionStore.isPresent()) {
+      /* DagActionStore is optional because there are other configurations 
that do not require it and it's initialized
+      in {@link GobblinServiceGuiceModule} which handles all possible 
configurations */
+      throw new RuntimeException("DagProcessingEngine should not be enabled 
without dagActionStore enabled.");
+    }
     this.dagActionStore = dagActionStore;
-    this.reminderSettingDagProcLeaseArbiter = 
reminderSettingDagProcLeaseArbiter;
+    this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
+    this.dagActionReminderScheduler = dagActionReminderScheduler;
+    this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
     MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+    if (this.isMultiActiveExecutionEnabled && 
!this.dagActionReminderScheduler.isPresent()) {
+      throw new RuntimeException(String.format("Multi-active execution enabled 
but required "
+          + "instance %s is absent.", 
DagActionReminderScheduler.class.getSimpleName()));
+    }
   }
 
   @Override
@@ -84,26 +117,42 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
 
   @Override
   public DagTask next() {
-    /*TODO: this requires use of lease arbiter, in single-active execution 
lease arbiter will not be present and we can
-     provide a dummy LeaseObtainedStatus or create alternate route
-    */
-    if (!this.reminderSettingDagProcLeaseArbiter.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    try {
-      LeaseAttemptStatus leaseAttemptStatus = null;
-      DagActionStore.DagAction dagAction = null;
-      while (!(leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus)) {
-        dagAction = this.dagActionQueue.take();  //`take` blocks till element 
is not available
-        // TODO: need to handle reminder events and flag them
-        leaseAttemptStatus = 
this.reminderSettingDagProcLeaseArbiter.get().tryAcquireLease(dagAction, 
System.currentTimeMillis(), false, false);
+      while (true) {
+        try {
+          DagActionStore.DagAction dagAction = this.dagActionQueue.take();
+          LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagAction);
+          if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
+            return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+          }
+        } catch (Exception e) {
+          //TODO: need to handle exceptions gracefully
+          log.error("Exception getting DagAction from the queue / creating 
DagTask", e);
+        }
       }
-      return createDagTask(dagAction, (LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttemptStatus);
-    } catch (Throwable t) {
-      //TODO: need to handle exceptions gracefully
-      log.error("Error getting DagAction from the queue / creating DagTask", 
t);
+  }
+
+  /**
+   * Returns a {@link LeaseAttemptStatus} associated with the
+   * `dagAction` by calling
+   * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction, 
long, boolean, boolean)}.
+   * @param dagAction
+   * @return
+   * @throws IOException
+   * @throws SchedulerException
+   */
+  private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction 
dagAction)
+      throws IOException, SchedulerException {
+    LeaseAttemptStatus leaseAttemptStatus;
+    // TODO: need to handle reminder events and flag them
+    leaseAttemptStatus = this.dagActionProcessingLeaseArbiter
+        .tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);
+        /* Schedule a reminder for the event unless the lease has been 
completed to safeguard against the case where even
+        we, when we might become the lease owner still fail to complete 
processing
+        */
+    if (!(leaseAttemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus)) {
+      scheduleReminderForEvent(leaseAttemptStatus);
     }
-    return null;
+    return leaseAttemptStatus;
   }
 
   private DagTask createDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
@@ -111,9 +160,18 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
 
     switch (dagActionType) {
       case LAUNCH:
-        return new LaunchDagTask(dagAction, leaseObtainedStatus);
+        return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
       default:
         throw new UnsupportedOperationException("Not yet implemented");
     }
   }
+
+  /* Schedules a reminder for the flow action using {@link 
DagActionReminderScheduler} to reattempt the lease after the
+  current leaseholder's grant would have expired.
+  */
+  protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
+      throws SchedulerException {
+    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
+        leaseStatus.getMinimumLingerDurationMillis());
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 6ce85cfd3..7fd90a244 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -86,6 +86,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 import static org.apache.gobblin.service.ExecutionStatus.*;
+import static 
org.apache.gobblin.service.modules.orchestration.DagActionStore.NO_JOB_NAME_DEFAULT;
 
 
 /**
@@ -183,8 +184,7 @@ public class DagManager extends AbstractIdleService {
     }
 
     DagActionStore.DagAction toDagAction(DagActionStore.DagActionType 
actionType) {
-      // defaults to empty jobName
-      return new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, "", actionType);
+      return new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, NO_JOB_NAME_DEFAULT, actionType);
     }
   }
 
@@ -591,7 +591,7 @@ public class DagManager extends AbstractIdleService {
     private void removeDagActionFromStore(DagId dagId, 
DagActionStore.DagActionType dagActionType) throws IOException {
       if (this.dagActionStore.isPresent()) {
         this.dagActionStore.get().deleteDagAction(
-            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId, "", dagActionType));
+            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType));
       }
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index b407c445c..6f6d920d4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
-import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -79,7 +78,8 @@ public class FlowLaunchHandler {
   private ContextAwareCounter failedToSetEventReminderCount;
 
   @Inject
-  public FlowLaunchHandler(Config config, 
@Named(GobblinServiceGuiceModule.SCHEDULER_LEASE_ARBITER_NAME) 
Optional<MultiActiveLeaseArbiter> leaseArbiter,
+  public FlowLaunchHandler(Config config,
+      @Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) 
Optional<MultiActiveLeaseArbiter> leaseArbiter,
       SchedulerService schedulerService, Optional<DagActionStore> 
dagActionStore) {
     this.multiActiveLeaseArbiter = leaseArbiter;
     this.dagActionStore = dagActionStore;
@@ -140,7 +140,7 @@ public class FlowLaunchHandler {
     if (this.dagActionStore.isPresent() && 
this.multiActiveLeaseArbiter.isPresent()) {
       try {
         DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
-        this.dagActionStore.get().addDagAction(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId(), 
dagAction.getDagActionType());
+        this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId(), 
dagAction.getDagActionType());
         // If the dag action has been persisted to the {@link DagActionStore} 
we can close the lease
         this.numFlowsSubmitted.mark();
         return 
this.multiActiveLeaseArbiter.get().recordLeaseSuccess(leaseStatus);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchMultiActiveLeaseArbiterFactory.java
similarity index 56%
copy from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
copy to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchMultiActiveLeaseArbiterFactory.java
index 98ede576b..86b576467 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchMultiActiveLeaseArbiterFactory.java
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.orchestration.task;
+package org.apache.gobblin.service.modules.orchestration;
 
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
-import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
 
 
 /**
- * A {@link DagTask} responsible to handle launch tasks.
+ * A factory implementation that returns a {@link MultiActiveLeaseArbiter} 
instance used by the
+ * {@link FlowLaunchHandler} in multi-active scheduler mode
  */
+@Slf4j
+public class FlowLaunchMultiActiveLeaseArbiterFactory extends 
MultiActiveLeaseArbiterFactory {
 
-public class LaunchDagTask extends DagTask {
-  public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
-    super(dagAction, leaseObtainedStatus);
-  }
-
-  public <T> T host(DagTaskVisitor<T> visitor) {
-    return visitor.meet(this);
+  @Inject
+  public FlowLaunchMultiActiveLeaseArbiterFactory(Config config) {
+    super(config, ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 7ddfdf616..029dd35b8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
 
+import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -60,6 +61,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  * They are used here to provide complete access to dag related information at 
one place.
  */
 @Slf4j
+@Singleton
 public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateStore {
   private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new ConcurrentHashMap<>();
   private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
ConcurrentHashMap<>();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiterFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiterFactory.java
new file mode 100644
index 000000000..64c353371
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiterFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Provider;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An abstract base class for {@link MultiActiveLeaseArbiter} factories that 
use a specific configuration key.
+ * Subclasses must provide a key to use in the constructor.
+ */
+@Slf4j
+public abstract class MultiActiveLeaseArbiterFactory implements 
Provider<MultiActiveLeaseArbiter> {
+    private final Config leaseArbiterConfig;
+    private final String configPrefix;
+
+    public MultiActiveLeaseArbiterFactory(Config config, String configPrefix) {
+      Objects.requireNonNull(config);
+      this.configPrefix = Objects.requireNonNull(configPrefix);
+      if (!config.hasPath(configPrefix)) {
+        throw new RuntimeException(String.format("Unable to initialize 
multiActiveLeaseArbiter due to missing "
+            + "configurations that should be prefixed by %s.", configPrefix));
+      }
+      this.leaseArbiterConfig = config.getConfig(configPrefix);
+      log.info("Lease arbiter will be initialized with config {}", 
leaseArbiterConfig);
+    }
+
+    @Override
+    public MultiActiveLeaseArbiter get() {
+      try {
+        return new InstrumentedLeaseArbiter(this.leaseArbiterConfig, new 
MysqlMultiActiveLeaseArbiter(leaseArbiterConfig), configPrefix);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to initialize " + configPrefix + " 
lease arbiter due to ", e);
+      }
+   }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 1481f097e..f0b93e059 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -45,7 +45,6 @@ import org.apache.gobblin.util.ExponentialBackoff;
 public class MysqlDagActionStore implements DagActionStore {
 
   public static final String CONFIG_PREFIX = "MysqlDagActionStore";
-  public static final String NO_JOB_NAME_DEFAULT = "";
 
   protected final DataSource dataSource;
   private final DBStatementExecutor dbStatementExecutor;
@@ -128,7 +127,7 @@ public class MysqlDagActionStore implements DagActionStore {
   }
 
   @Override
-  public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType)
+  public void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType)
       throws IOException {
     dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
     try {
@@ -146,9 +145,9 @@ public class MysqlDagActionStore implements DagActionStore {
   }
 
   @Override
-  public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType)
+  public void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType)
       throws IOException {
-    addDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, 
dagActionType);
+    addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, 
dagActionType);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 03142ebbc..9dfea5acc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -29,7 +29,6 @@ import java.util.Optional;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
-import com.google.inject.Inject;
 import com.typesafe.config.Config;
 
 import javax.sql.DataSource;
@@ -175,7 +174,6 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   private static final ThreadLocal<Calendar> UTC_CAL =
       ThreadLocal.withInitial(() -> 
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
 
-  @Inject
   public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
     if (config.hasPath(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX)) {
       config = 
config.getConfig(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX).withFallback(config);
@@ -184,17 +182,20 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           + "before all properties", 
ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX));
     }
 
-    // TODO: create two tables or take in table name as parameter to have 
scheduler and executor table
-    this.leaseArbiterTableName = ConfigUtils.getString(config, 
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
-        
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
-    this.constantsTableName = ConfigUtils.getString(config, 
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
-        ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+    if 
(!config.hasPath(ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY)
+        || 
!config.hasPath(ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY)) {
+      throw new RuntimeException(String.format("Table names %s and %s are 
required to be configured so multiple instances do not "
+          + "utilize the same table name", 
ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+          ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY));
+    }
+    this.leaseArbiterTableName = 
config.getString(ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY);
+    this.constantsTableName = 
config.getString(ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY);
     this.epsilonMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
     this.lingerMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
-    this.retentionPeriodMillis = ConfigUtils.getLong(config, 
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
-        
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
+    this.retentionPeriodMillis = ConfigUtils.getLong(config, 
ConfigurationKeys.LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
+        
ConfigurationKeys.DEFAULT_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
     this.thisTableRetentionStatement = 
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, 
this.leaseArbiterTableName,
         retentionPeriodMillis);
     this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, 
this.leaseArbiterTableName,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f4977caa2..7540f31f1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -234,7 +234,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
 
       DagActionStore.DagAction launchDagAction =
-          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
"", DagActionStore.DagActionType.LAUNCH);
+          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH);
 
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
       // Skip flow compilation as well, since we recompile after receiving 
event from DagActionStoreChangeMonitor later
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
deleted file mode 100644
index 91e7f20b0..000000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.orchestration;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Optional;
-
-import org.quartz.Job;
-import org.quartz.JobBuilder;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-
-import com.typesafe.config.Config;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
-
-
-/**
- * Decorator used to coordinate multiple hosts with execution components 
enabled to respond to flow action events with
- * added capabilities to properly handle the result of attempted ownership 
over these flow action events. It uses the
- * {@link MultiActiveLeaseArbiter} to determine a single lease owner at a 
given event time for a flow action event.
- * If the status of the lease ownership attempt is anything other than an 
indication the lease has been completed
- * ({@link LeaseAttemptStatus.NoLongerLeasingStatus}) then the
- * {@link MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder 
for the flow action using
- * {@link DagActionReminderScheduler} to reattempt the lease after the current 
lease holder's grant would have expired.
- */
-@Slf4j
-public class ReminderSettingDagProcLeaseArbiter implements 
MultiActiveLeaseArbiter {
-  private final Optional<MultiActiveLeaseArbiter> decoratedLeaseArbiter;
-  private final Optional<DagActionReminderScheduler> 
dagActionReminderScheduler;
-  private final Config config;
-  private final String MISSING_OPTIONAL_ERROR_MESSAGE = 
String.format("Multi-active execution is not enabled so dag "
-      + "action should not passed to %s", 
ReminderSettingDagProcLeaseArbiter.class.getSimpleName());
-
-  @Inject
-  public ReminderSettingDagProcLeaseArbiter(Config config,
-      @Named(GobblinServiceGuiceModule.EXECUTOR_LEASE_ARBITER_NAME) 
Optional<MultiActiveLeaseArbiter> leaseArbiter,
-      Optional<DagActionReminderScheduler> dagActionReminderScheduler) {
-    this.decoratedLeaseArbiter = leaseArbiter;
-    this.dagActionReminderScheduler = dagActionReminderScheduler;
-    this.config = config;
-  }
-
-  /**
-   * Attempts a lease for a particular job event and sets a reminder to 
revisit if the lease has not been completed.
-   */
-  @Override
-  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
dagAction, long eventTimeMillis,
-      boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) {
-    if (this.decoratedLeaseArbiter.isPresent()) {
-      try {
-        LeaseAttemptStatus leaseAttemptStatus =
-            this.decoratedLeaseArbiter.get().tryAcquireLease(dagAction, 
eventTimeMillis, isReminderEvent,
-                skipFlowExecutionIdReplacement);
-      /* Schedule a reminder for the event unless the lease has been completed 
to safeguard against the case where even
-      we, when we might become the lease owner still fail to complete 
processing
-      */
-        if (!(leaseAttemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus)) {
-          scheduleReminderForEvent(leaseAttemptStatus);
-        }
-        return leaseAttemptStatus;
-      } catch (SchedulerException | IOException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
-    }
-  }
-
-  @Override
-  public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus 
status)
-      throws IOException {
-    if (!this.decoratedLeaseArbiter.isPresent()) {
-      throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
-    }
-    return this.decoratedLeaseArbiter.get().recordLeaseSuccess(status);
-  }
-
-  protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
-      throws SchedulerException {
-    if (!this.dagActionReminderScheduler.isPresent()) {
-      throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
-    }
-    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
-        leaseStatus.getMinimumLingerDurationMillis());
-  }
-
-  /**
-   * Static class used to store information regarding a pending dagAction that 
needs to be revisited at a later time
-   * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
-   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
-   */
-  @Slf4j
-  public static class ReminderJob implements Job {
-    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
-    public static final String DAG_MANAGEMENT_KEY = "dag.management";
-
-    @Override
-    public void execute(JobExecutionContext context) {
-      // Get properties from the trigger to create a dagAction
-      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
-      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
-      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
-      DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(
-          jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
-      DagManagement dagManagement = (DagManagement) 
jobDataMap.get(DAG_MANAGEMENT_KEY);
-
-      log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
-          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
-
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
-          dagActionType);
-
-      try {
-        dagManagement.addDagAction(dagAction);
-      } catch (IOException e) {
-        log.error("Failed to add DagAction to DagManagement. Action: {}", 
dagAction);
-      }
-    }
-  }
-
-  /**
-   * Creates a key for the reminder job by concatenating all dagAction fields
-   */
-  public static String createDagActionReminderKey(DagActionStore.DagAction 
dagAction) {
-    return createDagActionReminderKey(dagAction.getFlowName(), 
dagAction.getFlowGroup(), dagAction.getFlowExecutionId(),
-        dagAction.getJobName(), dagAction.getDagActionType());
-  }
-
-  /**
-   * Creates a key for the reminder job by concatenating flowName, flowGroup, 
flowExecutionId, jobName, dagActionType
-   * in that order
-   */
-  public static String createDagActionReminderKey(String flowName, String 
flowGroup, String flowId, String jobName,
-      DagActionStore.DagActionType dagActionType) {
-    return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId, 
jobName, dagActionType);
-  }
-
-  /**
-   * Creates a jobDetail containing flow and job identifying information in 
the jobDataMap, uniquely identified
-   *  by a key comprised of the dagAction's fields. It also serializes a 
reference to the {@link DagManagement} object
-   *  to be referenced when the trigger fires.
-   */
-  public static JobDetail createReminderJobDetail(DagManagement dagManagement, 
DagActionStore.DagAction dagAction) {
-    JobDataMap dataMap = new JobDataMap();
-    dataMap.put(ReminderJob.DAG_MANAGEMENT_KEY, dagManagement);
-    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
-    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
-    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
-    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
-    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagAction.getDagActionType());
-
-    return JobBuilder.newJob(ReminderJob.class)
-        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowName())
-        .usingJobData(dataMap)
-        .build();
-  }
-
-  /**
-   * Creates a Trigger object with the same key as the ReminderJob (since only 
one trigger is expected to be associated
-   * with a job at any given time) that should fire after 
`reminderDurationMillis` millis.
-   */
-  public static Trigger createReminderJobTrigger(DagActionStore.DagAction 
dagAction, long reminderDurationMillis) {
-    Trigger trigger = TriggerBuilder.newTrigger()
-        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowName())
-        .startAt(new Date(System.currentTimeMillis() + reminderDurationMillis))
-        .build();
-    return trigger;
-  }
-}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 1d9dc950c..6a5943445 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -19,6 +19,9 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
 
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -36,21 +39,25 @@ import 
org.apache.gobblin.service.modules.orchestration.task.DagTask;
  * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
  */
 @Alpha
+@Data
 @Slf4j
 public abstract class DagProc<S, T> {
   protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
   protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
       metricContext, "org.apache.gobblin.service").build();
+  @Getter(AccessLevel.PROTECTED)
+  private final DagTask dagTask;
 
   public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
     S state = initialize(dagManagementStateStore);   // todo - retry
     T result = act(dagManagementStateStore, state);   // todo - retry
-    commit(dagManagementStateStore, result);   // todo - retry
     sendNotification(result, eventSubmitter);   // todo - retry
     log.info("{} successfully concluded actions for dagId : {}", 
getClass().getSimpleName(), getDagId());
   }
 
-  protected abstract DagManager.DagId getDagId();
+  protected DagManager.DagId getDagId() {
+    return this.dagTask.getDagId();
+  }
 
   protected abstract S initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
 
@@ -59,7 +66,4 @@ public abstract class DagProc<S, T> {
   protected abstract void sendNotification(T result, EventSubmitter 
eventSubmitter) throws IOException;
 
   // todo - commit the modified dags to the persistent store, maybe not 
required for InMem dagManagementStateStore
-  protected void commit(DagManagementStateStore dagManagementStateStore, T 
result) {
-
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index ce9f56705..6368d8887 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Maps;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -46,7 +45,6 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -58,28 +56,27 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
  * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
-  private final LaunchDagTask launchDagTask;
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
   // todo - this is not orchestration delay and should be renamed. keeping it 
the same because DagManager is also using
   // the same name
   private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
+    super(launchDagTask);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+  }
+
   static {
     metricContext.register(
         
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
   }
 
-  @Override
-  protected DagManager.DagId getDagId() {
-    return this.launchDagTask.getDagId();
-  }
-
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
     try {
-      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      DagActionStore.DagAction dagAction = this.getDagTask().getDagAction();
       FlowSpec flowSpec = loadFlowSpec(dagManagementStateStore, dagAction);
       flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
       return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
@@ -192,6 +189,7 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
     }
   }
 
+  @Override
   protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter) {
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 237eafdb3..732b9a226 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration.task;
 import java.io.IOException;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
@@ -38,26 +39,31 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
  */
 
 @Alpha
+@Slf4j
 public abstract class DagTask {
   @Getter public final DagActionStore.DagAction dagAction;
   private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
+  private final DagActionStore dagActionStore;
   @Getter protected final DagManager.DagId dagId;
 
-  public DagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
+  public DagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+      DagActionStore dagActionStore) {
     this.dagAction = dagAction;
     this.leaseObtainedStatus = leaseObtainedStatus;
+    this.dagActionStore = dagActionStore;
     this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
   }
 
   public abstract <T> T host(DagTaskVisitor<T> visitor);
 
   /**
-   * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be 
done in this method.
+   * Any cleanup work, including removing the dagAction from the 
dagActionStore and completing the lease acquired to
+   * work on this task, is done in this method.
    * Returns true if concluding dag task finished successfully otherwise false.
    */
-  // todo call it from the right place
   public boolean conclude() {
     try {
+      this.dagActionStore.deleteDagAction(this.dagAction);
       return this.leaseObtainedStatus.completeLease();
     } catch (IOException e) {
       // TODO: Decide appropriate exception to throw and add to the commit 
method's signature
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 98ede576b..88a620dda 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -27,8 +27,9 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
  */
 
 public class LaunchDagTask extends DagTask {
-  public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
-    super(dagAction, leaseObtainedStatus);
+  public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+      DagActionStore dagActionStore) {
+    super(dagAction, leaseObtainedStatus, dagActionStore);
   }
 
   public <T> T host(DagTaskVisitor<T> visitor) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 650938971..f792ace15 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -70,7 +70,7 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
             + "for action to be completed.", HttpStatus.S_409_CONFLICT);
         return;
       }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), actionType);
+      this.dagActionStore.addFlowDagAction(flowGroup, flowName, 
flowExecutionId.toString(), actionType);
     } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add %s action for flow %s %s %s to dag 
action store due to:", actionType, flowGroup,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 00fc8b566..68fded2e0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -180,6 +180,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
+    String jobName = value.getJobName();
 
     produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
     log.debug("Processing Dag Action message for flow group: {} name: {} 
executionId: {} tid: {} operation: {} lag: {}",
@@ -200,8 +201,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(value.getDagAction().toString());
 
     // Used to easily log information to identify the dag action
-    // TODO: add jobName to the dagAction change event
-    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "",
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
         dagActionType);
 
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
@@ -210,12 +210,13 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
       if (operation.equals("INSERT")) {
         handleDagAction(dagAction, false);
       } else if (operation.equals("UPDATE")) {
+        // TODO: change this warning message and process updates if for launch 
or reevaluate type
         log.warn("Received an UPDATE action to the DagActionStore when values 
in this store are never supposed to be "
             + "updated. Flow group: {} name {} executionId {} were updated to 
action {}", flowGroup, flowName,
             flowExecutionId, dagActionType);
         this.unexpectedErrors.mark();
       } else if (operation.equals("DELETE")) {
-        log.debug("Deleted flow group: {} name: {} executionId {} from 
DagActionStore", flowGroup, flowName, flowExecutionId);
+        log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
       } else {
         log.warn("Received unsupported change type of operation {}. Expected 
values to be in [INSERT, UPDATE, DELETE]",
             operation);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 9fee0d3f1..3dcc106f5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -64,6 +64,7 @@ import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.gobblin.testing.AssertWithBackoff;
@@ -190,6 +191,8 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(AbstractUserQuotaManager.PER_USER_QUOTA, 
"testUser:1");
     transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, 
"10000");
 
+    
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY,
 true);
+
     // Create a bare repository
     RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new 
File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
     fileKey.open(false).create(true);
@@ -266,6 +269,17 @@ public class GobblinServiceManagerTest {
     mysql.stop();
   }
 
+  /**
+   * Tests retrieving two classes initiated by GobblinServiceGuiceModule. One 
is explicitly bound and another optionally
+   * through a configuration enabled in the setup method above.
+   */
+  @Test
+  public void testGetClass() {
+    
Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
 instanceof FlowCompilationValidationHelper);
+    // Optionally bound config
+    Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCatalog.class) 
instanceof FlowCatalog);
+  }
+
   /**
    * To test an existing flow in a spec store does not get deleted just 
because it is not compilable during service restarts
    */
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
new file mode 100644
index 000000000..cb2ac540b
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.Trigger;
+import org.quartz.TriggerUtils;
+import org.quartz.spi.OperableTrigger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
+public class DagActionReminderSchedulerTest {
+  String flowGroup = "fg";
+  String flowName = "fn";
+  String flowExecutionId = "123";
+  String jobName = "jn";
+  String expectedKey =  String.join(".", flowGroup, flowName, flowExecutionId, 
jobName,
+      String.valueOf(DagActionStore.DagActionType.LAUNCH));
+  DagActionStore.DagAction launchDagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
+      DagActionStore.DagActionType.LAUNCH);
+
+  @Test
+  public void testCreateDagActionReminderKey() {
+    Assert.assertEquals(expectedKey, 
DagActionReminderScheduler.createDagActionReminderKey(launchDagAction));
+  }
+
+  @Test
+  public void testCreateReminderJobTrigger() {
+    long reminderDuration = 666L;
+    Supplier<Long> getCurrentTimeMillis = () -> 12345600000L;
+    Trigger reminderTrigger = DagActionReminderScheduler
+        .createReminderJobTrigger(launchDagAction, reminderDuration, 
getCurrentTimeMillis);
+    Assert.assertEquals(reminderTrigger.getKey().toString(), flowGroup + "." + 
expectedKey);
+    List<Date> fireTimes = TriggerUtils.computeFireTimes((OperableTrigger) 
reminderTrigger, null, 1);
+    Assert.assertEquals(fireTimes.get(0), new Date(reminderDuration + 
getCurrentTimeMillis.get()));
+  }
+
+  @Test
+  public void testCreateReminderJobDetail() {
+    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(launchDagAction);
+    Assert.assertEquals(jobDetail.getKey().toString(), flowGroup + "." + 
expectedKey);
+    JobDataMap dataMap = jobDetail.getJobDataMap();
+    Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY), 
flowGroup);
+    Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_NAME_KEY), 
flowName);
+    Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 
flowExecutionId);
+    Assert.assertEquals(dataMap.get(ConfigurationKeys.JOB_NAME_KEY), jobName);
+    
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_TYPE_KEY),
+        DagActionStore.DagActionType.LAUNCH);
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 3ca35f4a3..2e677f59d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -75,7 +76,9 @@ public class DagManagementTaskStreamImplTest {
     MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, null);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
-        new DagManagementTaskStreamImpl(config, Optional.empty(), 
Optional.of(mock(ReminderSettingDagProcLeaseArbiter.class)));
+        new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
+            mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
+            false);
     this.dagProcFactory = new DagProcFactory(null);
     this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
         this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
@@ -84,10 +87,11 @@ public class DagManagementTaskStreamImplTest {
   /* This tests adding and removal of dag actions from dag task stream with a 
launch task. It verifies that the
   {@link DagManagementTaskStreamImpl#next()} call blocks until a {@link 
LeaseAttemptStatus.LeaseObtainedStatus} is
   returned for a particular action.
-  TODO: when we have different dag procs in future, update this test to add 
other types of actions (and tasks)
+  TODO: when we have different dag procs in the future, update this test to 
add other types of actions (and tasks)
   */
   @Test
-  public void addRemoveDagActions() {
+  public void addRemoveDagActions()
+      throws IOException {
     /* Three duplicate actions are added to the task stream, since the first 
two calls to lease arbitration will return
     statuses that should cause the next() method to continue polling for tasks 
before finally providing the
      LeaseObtainedStatus to the taskStream to break its loop and return a 
newly created dagTask
@@ -96,7 +100,7 @@ public class DagManagementTaskStreamImplTest {
     dagManagementTaskStream.addDagAction(launchAction);
     dagManagementTaskStream.addDagAction(launchAction);
     dagManagementTaskStream.addDagAction(launchAction);
-    when(dagManagementTaskStream.getReminderSettingDagProcLeaseArbiter().get()
+    when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
         .tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(), 
anyBoolean(), anyBoolean()))
         .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
             new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index bf84f890c..0afdbe9e8 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -58,6 +58,10 @@ import org.apache.gobblin.util.ConfigUtils;
 import static org.mockito.Mockito.*;
 
 
+/**
+ * Tests the state updates (including updating in-memory state and 
MysqlDagActionStore) after performing add or cancel
+ * operations by calling addDag, stopDag, kill, and resume. It also tests 
flows with and without sla configs.
+ */
 public class DagManagerFlowTest {
   MockedDagManager dagManager;
   int dagNumThreads;
@@ -85,8 +89,8 @@ public class DagManagerFlowTest {
         .build();
 
     dagActionStore = new MysqlDagActionStore(config);
-    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionType.KILL);
-    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, 
DagActionStore.DagActionType.RESUME);
+    dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionType.KILL);
+    dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId_2, 
DagActionStore.DagActionType.RESUME);
     dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props));
     dagManager.dagActionStore = Optional.of(dagActionStore);
     dagManager.setActive(true);
@@ -127,7 +131,7 @@ public class DagManagerFlowTest {
 
     // mock add spec
     // for very first dag to be added, add dag action to store and check its 
deleted by the addDag call
-    dagManager.getDagActionStore().get().addDagAction("group0", "flow0", 
Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH);
+    dagManager.getDagActionStore().get().addFlowDagAction("group0", "flow0", 
Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH);
     dagManager.addDag(dag1, true, true);
     Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0", 
"flow0", Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH));
     dagManager.addDag(dag2, true, true);
@@ -336,7 +340,7 @@ public class DagManagerFlowTest {
     String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
     // Add kill action to action store and call kill
-    dagActionStore.addDagAction(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.DagActionType.KILL);
+    dagActionStore.addFlowDagAction(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.DagActionType.KILL);
     dagManager.handleKillFlowRequest(flowGroup, flowName, flowExecutionId);
 
     // Check that the kill dag action is removed
@@ -353,7 +357,7 @@ public class DagManagerFlowTest {
 
 
     // Add resume action to action store and call resume
-    dagActionStore.addDagAction(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
+    dagActionStore.addFlowDagAction(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
     dagManager.handleResumeFlowRequest(flowGroup, flowName, flowExecutionId);
 
     // Check that the resume dag action is removed
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 99f5bd577..138ddb3a2 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -44,6 +45,7 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 @Slf4j
@@ -79,7 +81,8 @@ public class DagProcessingEngineTest {
     this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, null);
     this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
-        new DagManagementTaskStreamImpl(config, Optional.empty(), null);
+        new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
+            mock(MultiActiveLeaseArbiter.class), Optional.empty(), false);
     this.dagProcFactory = new DagProcFactory(null);
     DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
         new 
DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream, 
this.dagProcFactory,
@@ -118,7 +121,7 @@ public class DagProcessingEngineTest {
     private final boolean isBad;
 
     public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
-      super(dagAction, null);
+      super(dagAction, null, null);
       this.isBad = isBad;
     }
 
@@ -136,6 +139,7 @@ public class DagProcessingEngineTest {
   static class MockedDagProc extends DagProc<Void, Void> {
     private final boolean isBad;
     public MockedDagProc(boolean isBad) {
+      super(null);
       this.isBad = isBad;
     }
 
@@ -158,11 +162,7 @@ public class DagProcessingEngineTest {
     }
 
     @Override
-    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) {
-    }
-
-    @Override
-    protected void commit(DagManagementStateStore dagManagementStateStore, 
Void result) {
+    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) throws IOException {
     }
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 2b0edfdec..da0bb737d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -62,13 +62,13 @@ public class MysqlDagActionStoreTest {
 
   @Test
   public void testAddAction() throws Exception {
-    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
+    this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
     //Should not be able to add KILL again when previous one exist
     Assert.expectThrows(IOException.class,
-        () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
+        () -> this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
     //Should be able to add a RESUME action for same execution as well as KILL 
for another execution of the flow
-    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME);
-    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL);
+    this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME);
+    this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL);
   }
 
   @Test(dependsOnMethods = "testAddAction")
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 5b0e4c424..93003a99a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -38,13 +38,14 @@ import 
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 
 @Slf4j
 public class MysqlMultiActiveLeaseArbiterTest {
-  private static final long EPSILON = 10000;
+  private static final long EPSILON = 10000L;
   private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1);
-  private static final long LINGER = 50000;
+  private static final long LINGER = 50000L;
   private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1);
   private static final String USER = "testUser";
   private static final String PASSWORD = "testPassword";
   private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
+  private static final String CONSTANTS_TABLE = "constants_store";
   private static final String flowGroup = "testFlowGroup";
   private static final String flowGroup2 = "testFlowGroup2";
   private static final String flowName = "testFlowName";
@@ -77,7 +78,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
         .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
         .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
         .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
-        
.addPrimitive(ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
 TABLE)
+        
.addPrimitive(ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY, TABLE)
+        .addPrimitive(ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY, 
CONSTANTS_TABLE)
         .build();
 
     this.mysqlMultiActiveLeaseArbiter = new 
MysqlMultiActiveLeaseArbiter(config);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index e75b2952e..6b71977a1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -77,8 +77,10 @@ public class LaunchDagProcTest {
         5, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg")));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
-    LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "fn",
-        "12345", "jn", DagActionStore.DagActionType.LAUNCH), null), 
flowCompilationValidationHelper);
+    LaunchDagProc launchDagProc = new LaunchDagProc(
+        new LaunchDagTask(new DagActionStore.DagAction("fg", "fn", "12345",
+            "jn", DagActionStore.DagActionType.LAUNCH), null, 
mock(DagActionStore.class)),
+        flowCompilationValidationHelper);
 
     launchDagProc.process(this.dagManagementStateStore);
     int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes

Reply via email to