This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bc6f8773 [GOBBLIN-2027] move DagActionStore, MultiActiveLeaseArbiter 
and their implementation/test classes from gobblin-runtime to gobblin-service 
(#3904)
2bc6f8773 is described below

commit 2bc6f8773f09ff7eb31560b5569021bd0fd044c3
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Mar 27 19:40:24 2024 -0700

    [GOBBLIN-2027] move DagActionStore, MultiActiveLeaseArbiter and their 
implementation/test classes from gobblin-runtime to gobblin-service (#3904)
    
    * move DagActionStore from gobblin-runtime to gobblin-service
    * move LeaseAttemptStatus out of MultiActiveLeaseArbiter interface
    * add doc
    * address review comment
---
 .../runtime/DagActionStoreChangeMonitorTest.java   |   2 +-
 .../modules/core/GobblinServiceGuiceModule.java    |   8 +-
 .../orchestration/DagActionReminderScheduler.java  |   2 -
 .../modules/orchestration}/DagActionStore.java     |  12 ++-
 .../modules/orchestration/DagManagement.java       |   4 +-
 .../orchestration/DagManagementTaskStreamImpl.java |  12 +--
 .../service/modules/orchestration/DagManager.java  |   3 +-
 .../modules/orchestration/DagManagerUtils.java     |   1 -
 .../modules/orchestration/FlowLaunchHandler.java   |  52 ++++++-----
 .../orchestration/InstrumentedLeaseArbiter.java    |  14 ++-
 .../modules/orchestration/LeaseAttemptStatus.java  |  99 ++++++++++++++++++++
 .../orchestration}/MultiActiveLeaseArbiter.java    |  77 +--------------
 .../orchestration}/MysqlDagActionStore.java        |   9 +-
 .../MysqlMultiActiveLeaseArbiter.java              |  25 ++---
 .../modules/orchestration/Orchestrator.java        |   1 -
 .../ReminderSettingDagProcLeaseArbiter.java        |  14 ++-
 .../modules/orchestration/proc/LaunchDagProc.java  |   2 +-
 .../modules/orchestration/task/DagTask.java        |   8 +-
 .../modules/orchestration/task/LaunchDagTask.java  |   6 +-
 ...lowExecutionResourceHandlerWithWarmStandby.java |   2 +-
 .../monitoring/DagActionStoreChangeMonitor.java    |   2 +-
 .../DagActionStoreChangeMonitorFactory.java        |   2 +-
 .../DagManagementDagActionStoreChangeMonitor.java  |   6 +-
 ...nagementDagActionStoreChangeMonitorFactory.java |   2 +-
 .../DagManagementTaskStreamImplTest.java           |   8 +-
 .../modules/orchestration/DagManagerFlowTest.java  |   2 -
 .../orchestration/DagProcessingEngineTest.java     |   1 -
 .../orchestration/FlowLaunchHandlerTest.java       |   6 +-
 .../orchestration}/MysqlDagActionStoreTest.java    |   5 +-
 .../MysqlMultiActiveLeaseArbiterTest.java          | 103 +++++++++++----------
 .../orchestration/proc/LaunchDagProcTest.java      |   2 +-
 31 files changed, 257 insertions(+), 235 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 706f7a8c8..71c97f0e5 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 414055faf..c01ff3e62 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -38,11 +38,11 @@ import com.typesafe.config.Config;
 import javax.inject.Singleton;
 
 import org.apache.gobblin.restli.EmbeddedRestliServer;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index f1a3109ee..ddcd1fa79 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -27,8 +27,6 @@ import org.quartz.impl.StdSchedulerFactory;
 
 import javax.inject.Inject;
 
-import org.apache.gobblin.runtime.api.DagActionStore;
-
 
 /**
  * This class is used to keep track of reminders of pending flow action events 
to execute. A host calls the
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
similarity index 92%
rename from 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 562b3cb9e..3076486f7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collection;
 
 import lombok.Data;
+
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 
 
 public interface DagActionStore {
@@ -54,6 +56,14 @@ public interface DagActionStore {
       return new DagAction(this.getFlowGroup(), this.getFlowName(),
           String.valueOf(eventTimeMillis), this.getJobName(), 
this.getDagActionType());
     }
+
+    /**
+     * Creates and returns a {@link DagNodeId} for this DagAction.
+     */
+    public DagNodeId getDagNodeId() {
+      return new DagNodeId(this.flowGroup, this.flowName,
+          Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
+    }
   }
 
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index 1e4b17c90..75ee9b60e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -19,12 +19,10 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 
-import org.apache.gobblin.runtime.api.DagActionStore;
-
 
 /**
  * An interface to provide abstractions for managing operations on Dag.
- * It accepts a {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} which can be processed 
later.
+ * It accepts a {@link DagActionStore.DagAction} which can be processed later.
  * Consumption of the Dags happen through {@link DagTaskStream}.
  */
 public interface DagManagement {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 192243075..a6778c82f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -33,8 +33,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.util.ConfigUtils;
@@ -42,7 +40,7 @@ import org.apache.gobblin.util.ConfigUtils;
 
 /**
  * DagManagementTaskStreamImpl implements {@link DagManagement} and {@link 
DagTaskStream}. It accepts
- * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction}s and 
iteratively provides {@link DagTask}.
+ * {@link DagActionStore.DagAction}s and iteratively provides {@link DagTask}.
  */
 @Slf4j
 @Singleton
@@ -93,14 +91,14 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
     }
     try {
-      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = null;
+      LeaseAttemptStatus leaseAttemptStatus = null;
       DagActionStore.DagAction dagAction = null;
-      while (!(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus)) {
+      while (!(leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus)) {
         dagAction = this.dagActionQueue.take();  //`take` blocks till element 
is not available
         // TODO: need to handle reminder events and flag them
         leaseAttemptStatus = 
this.reminderSettingDagProcLeaseArbiter.get().tryAcquireLease(dagAction, 
System.currentTimeMillis(), false, false);
       }
-      return createDagTask(dagAction, 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus);
+      return createDagTask(dagAction, (LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttemptStatus);
     } catch (Throwable t) {
       //TODO: need to handle exceptions gracefully
       log.error("Error getting DagAction from the queue / creating DagTask", 
t);
@@ -108,7 +106,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     return null;
   }
 
-  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
     DagActionStore.DagActionType dagActionType = dagAction.getDagActionType();
 
     switch (dagActionType) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 71b8a318a..6ce85cfd3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -65,7 +65,6 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -184,7 +183,7 @@ public class DagManager extends AbstractIdleService {
     }
 
     DagActionStore.DagAction toDagAction(DagActionStore.DagActionType 
actionType) {
-      // defaults to empty jobName 
+      // defaults to empty jobName
       return new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, "", actionType);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index decf579aa..c4b62b5ec 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -40,7 +40,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 1cd5af044..b407c445c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -17,9 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -29,28 +26,33 @@ import java.util.Date;
 import java.util.Locale;
 import java.util.Properties;
 import java.util.Random;
+
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.JobDetailImpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
 import javax.inject.Inject;
 import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.util.ConfigUtils;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.impl.JobDetailImpl;
 
 
 /**
@@ -59,7 +61,7 @@ import org.quartz.impl.JobDetailImpl;
  * lease owner at a given time for the launch dag action event. After 
acquiring the lease, it persists the dag action
  * event to the {@link DagActionStore} to be eventually acted upon by 
execution module of host(s) to execute the launch.
  * Once it has completed persisting the action to the store, it will mark the 
lease as completed by calling the
- * {@link 
MultiActiveLeaseArbiter#recordLeaseSuccess(MultiActiveLeaseArbiter.LeaseObtainedStatus)}
 method. Hosts
+ * {@link 
MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
 method. Hosts
  * that do not gain the lease for the event, instead schedule a reminder using 
the {@link SchedulerService} to check
  * back in on the previous lease owner's completion status after the lease 
should expire to ensure the event is handled
  * in failure cases.
@@ -107,12 +109,12 @@ public class FlowLaunchHandler {
   public void handleFlowLaunchTriggerEvent(Properties jobProps, 
DagActionStore.DagAction dagAction,
       long eventTimeMillis, boolean isReminderEvent, boolean 
skipFlowExecutionIdReplacement) throws IOException {
       if (this.multiActiveLeaseArbiter.isPresent()) {
-        MultiActiveLeaseArbiter.LeaseAttemptStatus
+        LeaseAttemptStatus
             leaseAttemptStatus = this.multiActiveLeaseArbiter.get()
             .tryAcquireLease(dagAction, eventTimeMillis, isReminderEvent, 
skipFlowExecutionIdReplacement);
-        if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
-          MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
-              (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+        if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
+          LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
+              (LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus;
           if (persistDagAction(leaseObtainedStatus)) {
             log.info("Successfully persisted lease: [{}, eventTimestamp: {}] 
", leaseObtainedStatus.getDagAction(),
                 leaseObtainedStatus.getEventTimeMillis());
@@ -121,9 +123,9 @@ public class FlowLaunchHandler {
           // If persisting the dag action failed, then we set another trigger 
for this event to occur immediately to
           // re-attempt handling the event
           scheduleReminderForEvent(jobProps,
-              new 
MultiActiveLeaseArbiter.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(),
 0L), eventTimeMillis);
-        } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
-          scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
+              new 
LeaseAttemptStatus.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(), 
0L), eventTimeMillis);
+        } else if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus) {
+          scheduleReminderForEvent(jobProps, 
(LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttemptStatus,
               eventTimeMillis);
         }
         // Otherwise leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything
@@ -134,7 +136,7 @@ public class FlowLaunchHandler {
   }
 
   // Called after obtaining a lease to persist the dag action to {@link 
DagActionStore} and mark the lease as done
-  private boolean persistDagAction(MultiActiveLeaseArbiter.LeaseObtainedStatus 
leaseStatus) {
+  private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus 
leaseStatus) {
     if (this.dagActionStore.isPresent() && 
this.multiActiveLeaseArbiter.isPresent()) {
       try {
         DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
@@ -159,7 +161,7 @@ public class FlowLaunchHandler {
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param triggerEventTimeMillis the event timestamp we were originally 
handling
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
LeaseAttemptStatus.LeasedToAnotherStatus status,
       long triggerEventTimeMillis) {
     DagActionStore.DagAction dagAction = status.getDagAction();
     JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
@@ -190,7 +192,7 @@ public class FlowLaunchHandler {
    * @return Trigger for reminder
    * @throws SchedulerException
    */
-  protected Trigger createAndScheduleReminder(JobKey origJobKey, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
+  protected Trigger createAndScheduleReminder(JobKey origJobKey, 
LeaseAttemptStatus.LeasedToAnotherStatus status,
       long triggerEventTimeMillis) throws SchedulerException {
     // Generate a suffix to differentiate the reminder Job and Trigger from 
the original JobKey and Trigger, so we can
     // allow us to keep track of additional properties needed for reminder 
events (all triggers associated with one job
@@ -214,7 +216,7 @@ public class FlowLaunchHandler {
    * @return
    */
   @VisibleForTesting
-  public static String 
createSuffixForJobTrigger(MultiActiveLeaseArbiter.LeasedToAnotherStatus 
leasedToAnotherStatus) {
+  public static String 
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnotherStatus 
leasedToAnotherStatus) {
     return "reminder_for_" + leasedToAnotherStatus.getEventTimeMillis();
   }
 
@@ -229,7 +231,7 @@ public class FlowLaunchHandler {
    * @throws SchedulerException
    */
   protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, 
JobKey reminderKey,
-      MultiActiveLeaseArbiter.LeasedToAnotherStatus status)
+      LeaseAttemptStatus.LeasedToAnotherStatus status)
       throws SchedulerException {
     JobDetailImpl jobDetail = (JobDetailImpl) 
this.schedulerService.getScheduler().getJobDetail(originalKey);
     jobDetail.setKey(reminderKey);
@@ -253,7 +255,7 @@ public class FlowLaunchHandler {
    */
   @VisibleForTesting
   public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
-      MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus, int 
schedulerMaxBackoffMillis) {
+      LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus, int 
schedulerMaxBackoffMillis) {
     Properties prevJobProps = (Properties) 
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
     long delayPeriodMillis = 
leasedToAnotherStatus.getMinimumLingerDurationMillis()
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index f06def5b1..51f40c04a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -29,8 +29,6 @@ import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -74,24 +72,24 @@ public class InstrumentedLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public MultiActiveLeaseArbiter.LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction dagAction, long eventTimeMillis,
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
dagAction, long eventTimeMillis,
       boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws 
IOException {
 
-    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
+    LeaseAttemptStatus leaseAttemptStatus =
         decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagAction, 
eventTimeMillis, isReminderEvent,
             skipFlowExecutionIdReplacement);
     log.info("Multi-active scheduler lease attempt for dagAction: {} received 
type of leaseAttemptStatus: [{}, "
             + "eventTimestamp: {}] ", dagAction, 
leaseAttemptStatus.getClass().getName(), eventTimeMillis);
-    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+    if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
       if (isReminderEvent) {
         this.leasesObtainedDueToReminderCount.mark();
       }
       this.leaseObtainedCount.inc();
       return leaseAttemptStatus;
-    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+    } else if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus) {
       this.leasedToAnotherStatusCount.inc();
       return leaseAttemptStatus;
-    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+    } else if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus) {
       this.noLongerLeasingStatusCount.inc();
       return leaseAttemptStatus;
     }
@@ -100,7 +98,7 @@ public class InstrumentedLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+  public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus 
status)
       throws IOException {
     if (this.decoratedMultiActiveLeaseArbiter.recordLeaseSuccess(status)) {
       this.recordedLeaseSuccessCount.mark();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
new file mode 100644
index 000000000..65526cb21
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
+
+
+/**
+ Class used to encapsulate status of lease acquisition attempts made by {@link 
MultiActiveLeaseArbiter} and contains
+ information specific to the status that results. The {@link 
LeaseAttemptStatus#getDagAction} and
+ {@link LeaseAttemptStatus#getMinimumLingerDurationMillis} are meant to be
+ overridden and used by relevant derived classes.
+ */
+public abstract class LeaseAttemptStatus {
+  public DagActionStore.DagAction getDagAction() {
+    return null;
+  }
+
+  public long getMinimumLingerDurationMillis() {
+    return 0;
+  }
+
+  /*
+   * This LeaseAttemptStatus tells the caller that work for which lease was 
requested is completed and thus lease is no
+   * longer required. There is also no need to set reminder events.
+   */
+  public static class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+  /*
+  The participant calling this method acquired the lease for the event in 
question. `Dag action`'s flow execution id
+  is the timestamp associated with the lease and the time the caller obtained 
the lease is stored within the
+  `leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference 
is used to recordLeaseSuccess for the
+  current LeaseObtainedStatus via the completeLease method from a caller 
without access to the {@link MultiActiveLeaseArbiter}.
+  */
+  @Data
+  public static class LeaseObtainedStatus extends LeaseAttemptStatus {
+    private final DagActionStore.DagAction dagAction;
+    private final long leaseAcquisitionTimestamp;
+    private final long minimumLingerDurationMillis;
+    @Getter(AccessLevel.NONE)
+    private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+
+    /**
+     * @return event time in millis since epoch for the event of this lease 
acquisition
+     */
+    public long getEventTimeMillis() {
+      return Long.parseLong(dagAction.getFlowExecutionId());
+    }
+
+    /**
+     * Completes the lease referenced by this status object if it has not 
expired.
+     * @return true if able to complete lease, false otherwise.
+     * @throws IOException
+     */
+    public boolean completeLease() throws IOException {
+      return multiActiveLeaseArbiter.recordLeaseSuccess(this);
+    }
+  }
+
+  /*
+  This dag action event already has a valid lease owned by another participant.
+  `Dag action`'s flow execution id is the timestamp the lease is associated 
with, however the dag action event it
+  corresponds to may be a different and distinct occurrence of the same event.
+  `minimumLingerDurationMillis` is the minimum amount of time to wait before 
this participant should return to check if
+  the lease has completed or expired
+   */
+  @Data
+  public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
+    private final DagActionStore.DagAction dagAction;
+    private final long minimumLingerDurationMillis;
+
+    /**
+     * Returns event time in millis since epoch for the event whose lease was 
obtained by another participant.
+     * @return
+     */
+    public long getEventTimeMillis() {
+      return Long.parseLong(dagAction.getFlowExecutionId());
+    }
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
similarity index 61%
rename from 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index 89df850be..3fb5acb95 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -15,14 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 
-import lombok.Data;
-import lombok.Getter;
-import lombok.AccessLevel;
-
 
 /**
  * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
@@ -76,74 +72,5 @@ public interface MultiActiveLeaseArbiter {
    *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
    *         if any actions it did successfully accomplish, do not count
    */
-  boolean recordLeaseSuccess(LeaseObtainedStatus status) throws IOException;
-
-  /*
-   Class used to encapsulate status of lease acquisition attempt and 
derivations should contain information specific to
-   the status that results. The #getDagAction and 
#getMinimumLingerDurationMillis are meant to be overriden and used by
-   relevant derived classes.
-   */
-  abstract class LeaseAttemptStatus {
-    public DagActionStore.DagAction getDagAction() {
-      return null;
-    }
-
-    public long getMinimumLingerDurationMillis() {
-      return 0;
-    }
-  }
-
-  class NoLongerLeasingStatus extends LeaseAttemptStatus {}
-
-  /*
-  The participant calling this method acquired the lease for the event in 
question. `Dag action`'s flow execution id
-  is the timestamp associated with the lease and the time the caller obtained 
the lease is stored within the
-  `leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference 
is used to recordLeaseSuccess for the
-  current LeaseObtainedStatus via the completeLease method from a caller 
without access to the {@link MultiActiveLeaseArbiter}.
-  */
-  @Data
-  class LeaseObtainedStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagAction dagAction;
-    private final long leaseAcquisitionTimestamp;
-    private final long minimumLingerDurationMillis;
-    @Getter(AccessLevel.NONE)
-    private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
-
-    /**
-     * @return event time in millis since epoch for the event of this lease 
acquisition
-     */
-    public long getEventTimeMillis() {
-      return Long.parseLong(dagAction.getFlowExecutionId());
-    }
-
-    /**
-     * Completes the lease referenced by this status object if it has not 
expired.
-     * @return true if able to complete lease, false otherwise.
-     * @throws IOException
-     */
-    public boolean completeLease() throws IOException {
-      return multiActiveLeaseArbiter.recordLeaseSuccess(this);
-    }
-  }
-
-  /*
-  This dag action event already has a valid lease owned by another participant.
-  `Dag action`'s flow execution id is the timestamp the lease is associated 
with, however the dag action event it
-  corresponds to may be a different and distinct occurrence of the same event.
-  `minimumLingerDurationMillis` is the minimum amount of time to wait before 
this participant should return to check if
-  the lease has completed or expired
-   */
-  @Data
-  class LeasedToAnotherStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagAction dagAction;
-    private final long minimumLingerDurationMillis;
-
-    /**
-     * Returns event time in millis since epoch for the event whose lease was 
obtained by another participant.
-     * @return
-     */
-    public long getEventTimeMillis() {
-      return Long.parseLong(dagAction.getFlowExecutionId());
-    }
-}
+  boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status) 
throws IOException;
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
similarity index 99%
rename from 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index d02f5333c..1481f097e 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.dag_action_store;
+package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -24,22 +24,21 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
 
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
 
-import java.util.concurrent.TimeUnit;
 import javax.sql.DataSource;
-
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExponentialBackoff;
 import org.apache.gobblin.util.DBStatementExecutor;
+import org.apache.gobblin.util.ExponentialBackoff;
 
 
 @Slf4j
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
similarity index 97%
rename from 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 88915b368..03142ebbc 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
 
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -30,9 +28,14 @@ import java.util.Calendar;
 import java.util.Optional;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
 import javax.sql.DataSource;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
@@ -271,7 +274,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - A new event trigger "
                   + "is being worked on, so this older reminder will be 
dropped.", dagAction,
               isReminderEvent ? "reminder" : "original", eventTimeMillis, 
dbEventTimestamp);
-          return new NoLongerLeasingStatus();
+          return new LeaseAttemptStatus.NoLongerLeasingStatus();
         }
         if (eventTimeMillis > dbEventTimestamp.getTime()) {
           // TODO: emit metric here to capture this unexpected behavior
@@ -303,7 +306,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
               updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
-          return new LeasedToAnotherStatus(updatedDagAction,
+          return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
dbCurrentTimestamp.getTime());
         }
         DagActionStore.DagAction updatedDagAction =
@@ -311,7 +314,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
3: Distinct event, lease is valid",
             updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
         // Utilize db lease acquisition timestamp for wait time
-        return new LeasedToAnotherStatus(updatedDagAction,
+        return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
             dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
dbCurrentTimestamp.getTime());
       } // Lease is invalid
       else if (leaseValidityStatus == 2) {
@@ -332,7 +335,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         if (isWithinEpsilon) {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 5: Same event, no longer leasing event"
               + " in db", dagAction, isReminderEvent ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
-          return new NoLongerLeasingStatus();
+          return new LeaseAttemptStatus.NoLongerLeasingStatus();
         }
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
6: Distinct event, no longer leasing "
             + "event in db", dagAction, isReminderEvent ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
@@ -499,7 +502,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     SelectInfoResult selectInfoResult = getRowInfo(dagAction);
     // Another participant won the lease in between
     if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
-      return new NoLongerLeasingStatus();
+      return new LeaseAttemptStatus.NoLongerLeasingStatus();
     }
     DagActionStore.DagAction updatedDagAction =
         adoptConsensusFlowExecutionId ? 
dagAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : dagAction;
@@ -510,12 +513,12 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     if (numRowsUpdated == 1) {
       log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] 
successfully!", updatedDagAction,
           isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis);
-      return new LeaseObtainedStatus(updatedDagAction, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), 
minimumLingerDurationMillis, this);
+      return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), 
minimumLingerDurationMillis, this);
     }
     log.info("Another participant acquired lease in between for [{}, is: {}, 
eventTimestamp: {}] - num rows updated: {}",
         updatedDagAction, isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
-    return new LeasedToAnotherStatus(updatedDagAction, 
minimumLingerDurationMillis);
+    return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
minimumLingerDurationMillis);
   }
 
   /**
@@ -579,7 +582,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+  public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus 
status)
       throws IOException {
     DagActionStore.DagAction dagAction = status.getDagAction();
     return 
dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
 leaseArbiterTableName),
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 14805ee0d..f4977caa2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -53,7 +53,6 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
index bc24cd0e7..91e7f20b0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
@@ -37,8 +37,6 @@ import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
 import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
 
 
@@ -47,7 +45,7 @@ import 
org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
  * added capabilities to properly handle the result of attempted ownership 
over these flow action events. It uses the
  * {@link MultiActiveLeaseArbiter} to determine a single lease owner at a 
given event time for a flow action event.
  * If the status of the lease ownership attempt is anything other than an 
indication the lease has been completed
- * ({@link 
org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.NoLongerLeasingStatus}) 
then the
+ * ({@link LeaseAttemptStatus.NoLongerLeasingStatus}) then the
  * {@link MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder 
for the flow action using
  * {@link DagActionReminderScheduler} to reattempt the lease after the current 
lease holder's grant would have expired.
  */
@@ -72,17 +70,17 @@ public class ReminderSettingDagProcLeaseArbiter implements 
MultiActiveLeaseArbit
    * Attempts a lease for a particular job event and sets a reminder to 
revisit if the lease has not been completed.
    */
   @Override
-  public MultiActiveLeaseArbiter.LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction dagAction, long eventTimeMillis,
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
dagAction, long eventTimeMillis,
       boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) {
     if (this.decoratedLeaseArbiter.isPresent()) {
       try {
-        MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
+        LeaseAttemptStatus leaseAttemptStatus =
             this.decoratedLeaseArbiter.get().tryAcquireLease(dagAction, 
eventTimeMillis, isReminderEvent,
                 skipFlowExecutionIdReplacement);
       /* Schedule a reminder for the event unless the lease has been completed 
to safeguard against the case where even
       we, when we might become the lease owner still fail to complete 
processing
       */
-        if (!(leaseAttemptStatus instanceof NoLongerLeasingStatus)) {
+        if (!(leaseAttemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus)) {
           scheduleReminderForEvent(leaseAttemptStatus);
         }
         return leaseAttemptStatus;
@@ -95,7 +93,7 @@ public class ReminderSettingDagProcLeaseArbiter implements 
MultiActiveLeaseArbit
   }
 
   @Override
-  public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+  public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus 
status)
       throws IOException {
     if (!this.decoratedLeaseArbiter.isPresent()) {
       throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
@@ -103,7 +101,7 @@ public class ReminderSettingDagProcLeaseArbiter implements 
MultiActiveLeaseArbit
     return this.decoratedLeaseArbiter.get().recordLeaseSuccess(status);
   }
 
-  protected void 
scheduleReminderForEvent(MultiActiveLeaseArbiter.LeaseAttemptStatus leaseStatus)
+  protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
       throws SchedulerException {
     if (!this.dagActionReminderScheduler.isPresent()) {
       throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 3e10b8319..ce9f56705 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -36,7 +36,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index a77f3cfcf..237eafdb3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -22,12 +22,12 @@ import java.io.IOException;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 
 
@@ -40,10 +40,10 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 @Alpha
 public abstract class DagTask {
   @Getter public final DagActionStore.DagAction dagAction;
-  private final MultiActiveLeaseArbiter.LeaseObtainedStatus 
leaseObtainedStatus;
+  private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
   @Getter protected final DagManager.DagId dagId;
 
-  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+  public DagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
     this.dagAction = dagAction;
     this.leaseObtainedStatus = leaseObtainedStatus;
     this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index c83edacea..98ede576b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -17,9 +17,9 @@
 
 package org.apache.gobblin.service.modules.orchestration.task;
 
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 
 /**
@@ -27,7 +27,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
  */
 
 public class LaunchDagTask extends DagTask {
-  public LaunchDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+  public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
     super(dagAction, leaseObtainedStatus);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 0b989c3b1..650938971 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -30,7 +30,7 @@ import java.sql.SQLException;
 import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.service.FlowStatusId;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 30c55f4e0..00fc8b566 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 9d7106373..07e4229f8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -26,7 +26,7 @@ import javax.inject.Named;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index ab3ea9c37..928266602 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -24,7 +24,7 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
@@ -52,8 +52,8 @@ public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChan
   }
 
   /**
-   * This implementation passes on the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} to the
-   * {@link DagManagement} instead of finding a {@link 
org.apache.gobblin.runtime.api.FlowSpec} and passing the spec to {@link 
Orchestrator}.
+   * This implementation passes on the {@link DagActionStore.DagAction} to 
{@link DagManagement} instead of finding a
+   * {@link org.apache.gobblin.runtime.api.FlowSpec} and passing the spec to 
{@link Orchestrator}.
    */
   @Override
   protected void handleDagAction(DagActionStore.DagAction dagAction, boolean 
isStartup) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 4eeb2bbee..6853ed79f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -26,7 +26,7 @@ import javax.inject.Named;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 68034658c..5a42e05d7 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -32,8 +32,6 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
@@ -100,9 +98,9 @@ public class DagManagementTaskStreamImplTest {
     dagManagementTaskStream.addDagAction(launchAction);
     when(dagManagementTaskStream.getReminderSettingDagProcLeaseArbiter().get()
         .tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(), 
anyBoolean(), anyBoolean()))
-        .thenReturn(new MultiActiveLeaseArbiter.NoLongerLeasingStatus(),
-            new MultiActiveLeaseArbiter.LeasedToAnotherStatus(launchAction, 
15),
-            new MultiActiveLeaseArbiter.LeaseObtainedStatus(launchAction, 0, 
5, null));
+        .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
+            new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15),
+            new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 0, 5, 
null));
     DagTask dagTask = dagManagementTaskStream.next();
     Assert.assertTrue(dagTask instanceof LaunchDagTask);
     DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index ef7296393..bf84f890c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -45,10 +45,8 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 9d9f584a8..b2a72d05b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -38,7 +38,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 5a430191d..655532ab0 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.junit.Assert;
 import org.quartz.JobDataMap;
@@ -35,8 +33,8 @@ public class FlowLaunchHandlerTest {
   int schedulerBackOffMillis = 10;
   DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction("flowName", "flowGroup",
       String.valueOf(eventToRevisit), "jobName", 
DagActionStore.DagActionType.LAUNCH);
-  MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus =
-      new MultiActiveLeaseArbiter.LeasedToAnotherStatus(dagAction, 
minimumLingerDurationMillis);
+  LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
+      new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, 
minimumLingerDurationMillis);
 
   /**
    * Remove first two fields from cron expression representing seconds and 
minutes to return truncated cron expression
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
similarity index 98%
rename from 
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 944cfa027..2b0edfdec 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.dag_action_store;
+package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
-
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashSet;
-import org.apache.gobblin.runtime.api.DagActionStore;
+
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
similarity index 84%
rename from 
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 9247cdbd0..5b0e4c424 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -15,23 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
 
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
-import static org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
 
 @Slf4j
 public class MysqlMultiActiveLeaseArbiterTest {
@@ -58,9 +61,9 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final Timestamp dummyTimestamp = new Timestamp(99999);
   private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
   private String formattedAcquireLeaseIfMatchingAllStatement =
-      
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
TABLE);
+      
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
 TABLE);
   private String formattedAcquireLeaseIfFinishedStatement =
-      String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
TABLE);
+      
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
 TABLE);
 
   // The setup functionality verifies that the initialization of the tables is 
done correctly and verifies any SQL
   // syntax errors.
@@ -89,11 +92,11 @@ public class MysqlMultiActiveLeaseArbiterTest {
   @Test
   public void testAcquireLeaseSingleParticipant() throws Exception {
     // Tests CASE 1 of acquire lease for a flow action event not present in DB
-    MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+    LeaseAttemptStatus firstLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
-    Assert.assertTrue(firstLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
-    MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
-        (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+    Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
+        (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
         firstObtainedStatus.getLeaseAcquisitionTimestamp());
     Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
@@ -103,22 +106,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Verify that different DagAction types for the same flow can have leases 
at the same time
     DagActionStore.DagAction killDagAction = new
         DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.KILL);
-    MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
+    LeaseAttemptStatus killStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, 
eventTimeMillis, false, true);
-    Assert.assertTrue(killStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
-    MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
-        (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
+    Assert.assertTrue(killStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    LeaseAttemptStatus.LeaseObtainedStatus killObtainedStatus =
+        (LeaseAttemptStatus.LeaseObtainedStatus) killStatus;
     Assert.assertTrue(
         killObtainedStatus.getLeaseAcquisitionTimestamp() >= 
killObtainedStatus.getEventTimeMillis());
 
     // Tests CASE 2 of acquire lease for a flow action event that already has 
a valid lease for the same event in db
     // Very little time should have passed if this test directly follows the 
one above so this call will be considered
     // the same as the previous event
-    MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+    LeaseAttemptStatus secondLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
-    Assert.assertTrue(secondLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
-    MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
-        (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+    Assert.assertTrue(secondLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
+    LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+        (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
     Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
     
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() 
> 0);
 
@@ -126,21 +129,21 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // valid
     // Allow enough time to pass for this trigger to be considered distinct, 
but not enough time so the lease expires
     Thread.sleep(MORE_THAN_EPSILON);
-    MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
+    LeaseAttemptStatus thirdLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
-    Assert.assertTrue(thirdLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
-    MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
-        (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
+    Assert.assertTrue(thirdLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
+    LeaseAttemptStatus.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
+        (LeaseAttemptStatus.LeasedToAnotherStatus) thirdLaunchStatus;
     Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > 
firstObtainedStatus.getEventTimeMillis());
     
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() < 
LINGER);
 
     // Tests CASE 4 of lease out of date
     Thread.sleep(LINGER);
-    MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
+    LeaseAttemptStatus fourthLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
-    Assert.assertTrue(fourthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
-    MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
-        (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
+    Assert.assertTrue(fourthLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    LeaseAttemptStatus.LeaseObtainedStatus fourthObtainedStatus =
+        (LeaseAttemptStatus.LeaseObtainedStatus) fourthLaunchStatus;
     Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() > 
eventTimeMillis + LINGER);
     Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis()
         <= fourthObtainedStatus.getLeaseAcquisitionTimestamp());
@@ -149,18 +152,18 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // done immediately after previous lease obtainment so should be marked as 
the same event
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
     Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
-    MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
+    LeaseAttemptStatus fifthLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
-    Assert.assertTrue(fifthLaunchStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
+    Assert.assertTrue(fifthLaunchStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
     Thread.sleep(MORE_THAN_EPSILON);
-    MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
+    LeaseAttemptStatus sixthLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
-    Assert.assertTrue(sixthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
-    MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
-        (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
+    Assert.assertTrue(sixthLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    LeaseAttemptStatus.LeaseObtainedStatus sixthObtainedStatus =
+        (LeaseAttemptStatus.LeaseObtainedStatus) sixthLaunchStatus;
     Assert.assertTrue(sixthObtainedStatus.getEventTimeMillis()
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
@@ -225,7 +228,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
     DagActionStore.DagAction updatedResumeDagAction = 
resumeDagAction.updateFlowExecutionId(
         selectInfoResult.getEventTimeMillis());
-    boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+    boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
         updatedResumeDagAction, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
     Assert.assertTrue(markedSuccess);
     // Ensure no NPE results from calling this after a lease has been 
completed and acquisition timestamp val is NULL
@@ -256,7 +259,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
     LeaseAttemptStatus attemptStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
olderEventTimestamp, true, true);
-    Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+    Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
   }
 
   /*
@@ -271,8 +274,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
     LeaseAttemptStatus attemptStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
-    Assert.assertTrue(attemptStatus instanceof LeasedToAnotherStatus);
-    LeasedToAnotherStatus leasedToAnotherStatus = (LeasedToAnotherStatus) 
attemptStatus;
+    Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
+    LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus = 
(LeaseAttemptStatus.LeasedToAnotherStatus) attemptStatus;
     Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(), 
selectInfoResult.getEventTimeMillis());
   }
 
@@ -288,8 +291,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Thread.sleep(MORE_THAN_LINGER);
     LeaseAttemptStatus attemptStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
-    Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
-    LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
+    Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    LeaseAttemptStatus.LeaseObtainedStatus obtainedStatus = 
(LeaseAttemptStatus.LeaseObtainedStatus) attemptStatus;
     Assert.assertTrue(obtainedStatus.getEventTimeMillis() > 
selectInfoResult.getEventTimeMillis());
     Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() > 
selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
   }
@@ -307,7 +310,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
          mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
      DagActionStore.DagAction updatedResumeDagAction = 
resumeDagAction.updateFlowExecutionId(
          selectInfoResult.getEventTimeMillis());
-     boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+     boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
          updatedResumeDagAction, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
      Assert.assertTrue(markedSuccess);
 
@@ -316,7 +319,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
      // Now have a reminder event check-in on the completed lease
      LeaseAttemptStatus attemptStatus =
          mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
-     Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+     Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
    }
 
    /*
@@ -327,22 +330,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
   @Test
   public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
     // Obtain a lease for a new action and verify its flowExecutionId is not 
updated
-    MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+    LeaseAttemptStatus firstLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, 
eventTimeMillis, false, false);
-    Assert.assertTrue(firstLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
-    MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
-        (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+    Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
+        (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <= 
firstObtainedStatus.getLeaseAcquisitionTimestamp());
     Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
 
     // A second attempt to obtain a lease on the same action should return a 
LeasedToAnotherStatus which also contains
     // the original flowExecutionId
-    MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+    LeaseAttemptStatus secondLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, 
eventTimeMillis, false, false);
-    Assert.assertTrue(secondLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
-    MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
-        (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+    Assert.assertTrue(secondLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
+    LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+        (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
     Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
     Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index f00e35fbf..e75b2952e 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -35,7 +35,7 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;

Reply via email to