This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b72193ddf [GOBBLIN-2100] Add javadocs to DagAction related classes 
(#3987)
b72193ddf is described below

commit b72193ddf53917d09e5d7c35b4b1ca009b9f8d42
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 27 15:20:02 2024 -0700

    [GOBBLIN-2100] Add javadocs to DagAction related classes (#3987)
    
    * Add javadocs to DagAction related classes
    * Revise javadocs and naming
    * Rename leaseObject to leaseParam in tests
---
 .../orchestration/DagActionReminderScheduler.java  |  35 ++++---
 .../modules/orchestration/DagActionStore.java      |  37 ++++++-
 .../modules/orchestration/DagManagement.java       |   2 +-
 .../orchestration/DagManagementTaskStreamImpl.java |  48 ++++-----
 .../modules/orchestration/FlowLaunchHandler.java   |   9 +-
 .../orchestration/InstrumentedLeaseArbiter.java    |  12 +--
 .../modules/orchestration/LeaseAttemptStatus.java  |  16 +--
 .../orchestration/MultiActiveLeaseArbiter.java     |   4 +-
 .../MysqlMultiActiveLeaseArbiter.java              | 115 +++++++++++----------
 .../modules/orchestration/Orchestrator.java        |   4 +-
 .../DagActionReminderSchedulerTest.java            |   2 +-
 .../DagManagementTaskStreamImplTest.java           |   6 +-
 .../orchestration/FlowLaunchHandlerTest.java       |   4 +-
 .../MysqlMultiActiveLeaseArbiterTest.java          | 107 +++++++++----------
 14 files changed, 215 insertions(+), 186 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 21a9bdb92..8c1d3a8af 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -72,18 +72,18 @@ public class DagActionReminderScheduler {
   /**
    *  Uses a dagAction & reminder duration in milliseconds to create a 
reminder job that will fire
    *  `reminderDurationMillis` after the current time
-   * @param dagActionLeaseObject
+   * @param leaseParams
    * @param reminderDurationMillis
    * @throws SchedulerException
    */
-  public void scheduleReminder(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject, long reminderDurationMillis,
+  public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long 
reminderDurationMillis,
       boolean isDeadlineReminder)
       throws SchedulerException {
-    JobDetail jobDetail = createReminderJobDetail(dagActionLeaseObject, 
isDeadlineReminder);
-    Trigger trigger = 
createReminderJobTrigger(dagActionLeaseObject.getDagAction(), 
reminderDurationMillis,
+    JobDetail jobDetail = createReminderJobDetail(leaseParams, 
isDeadlineReminder);
+    Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(), 
reminderDurationMillis,
         System::currentTimeMillis, isDeadlineReminder);
     log.info("Reminder set for dagAction {} to fire after {} ms, 
isDeadlineTrigger: {}",
-        dagActionLeaseObject.getDagAction(), reminderDurationMillis, 
isDeadlineReminder);
+        leaseParams.getDagAction(), reminderDurationMillis, 
isDeadlineReminder);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
@@ -113,16 +113,16 @@ public class DagActionReminderScheduler {
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
       long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
 
-      DagActionStore.DagActionLeaseObject reminderDagActionLeaseObject = new 
DagActionStore.DagActionLeaseObject(
+      DagActionStore.LeaseParams reminderLeaseParams = new 
DagActionStore.LeaseParams(
           new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType),
           true, eventTimeMillis);
-      log.info("DagProc reminder triggered for dagAction event: {}", 
reminderDagActionLeaseObject);
+      log.info("DagProc reminder triggered for dagAction event: {}", 
reminderLeaseParams);
 
       try {
         DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
-        dagManagement.addReminderDagAction(reminderDagActionLeaseObject);
+        dagManagement.addReminderDagAction(reminderLeaseParams);
       } catch (IOException e) {
-        log.error("Failed to add DagAction event to DagManagement. dagAction 
event: {}", reminderDagActionLeaseObject);
+        log.error("Failed to add DagAction event to DagManagement. dagAction 
event: {}", reminderLeaseParams);
       }
     }
   }
@@ -152,17 +152,18 @@ public class DagActionReminderScheduler {
    *  by a key comprised of the dagAction's fields. boolean isDeadlineReminder 
is flag that tells if this createReminder
    *  requests are for deadline dag actions that are setting reminder for 
deadline duration.
    */
-  public static JobDetail 
createReminderJobDetail(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject, boolean isDeadlineReminder) {
+  public static JobDetail createReminderJobDetail(DagActionStore.LeaseParams 
leaseParams, boolean isDeadlineReminder) {
     JobDataMap dataMap = new JobDataMap();
-    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, 
dagActionLeaseObject.getDagAction().getFlowName());
-    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, 
dagActionLeaseObject.getDagAction().getFlowGroup());
-    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, 
dagActionLeaseObject.getDagAction().getJobName());
-    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagActionLeaseObject.getDagAction().getFlowExecutionId());
-    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagActionLeaseObject.getDagAction().getDagActionType());
-    dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY, 
dagActionLeaseObject.getEventTimeMillis());
+    DagActionStore.DagAction dagAction = leaseParams.getDagAction();
+    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
+    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
+    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
+    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagAction.getDagActionType());
+    dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY, 
leaseParams.getEventTimeMillis());
 
     return JobBuilder.newJob(ReminderJob.class)
-        .withIdentity(createJobKey(dagActionLeaseObject.getDagAction(), 
isDeadlineReminder))
+        .withIdentity(createJobKey(leaseParams.getDagAction(), 
isDeadlineReminder))
         .usingJobData(dataMap)
         .build();
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 57b40a1d6..4bc7e639c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -28,6 +28,10 @@ import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 
 
+/**
+ * GaaS store for pending {@link DagAction}s on a flow or job.
+ * See javadoc for {@link DagAction}
+ */
 public interface DagActionStore {
   public static final String NO_JOB_NAME_DEFAULT = "";
   enum DagActionType {
@@ -39,6 +43,18 @@ public interface DagActionStore {
     RESUME, // Resume flow invoked through API call
   }
 
+  /**
+   * A DagAction uniquely identifies a particular flow (or job level) 
execution and the action to be performed on it,
+   * denoted by the `dagActionType` field.
+   * These are created and stored either by a REST client request or generated 
within GaaS. The Flow Management layer
+   * retrieves and executes {@link DagAction}s to progress a flow's execution 
or enforce execution deadlines.
+   *
+   * Flow group, name, and executionId are sufficient to define a flow level 
action (used with
+   * {@link DagActionStore#NO_JOB_NAME_DEFAULT}). When `jobName` is provided, 
it can be used to identify the specific
+   * job on which the action is to be performed. The schema of this class 
matches exactly that of the
+   * {@link DagActionStore}.
+   *
+   */
   @Data
   @RequiredArgsConstructor
   class DagAction {
@@ -78,9 +94,15 @@ public interface DagActionStore {
     }
   }
 
+  /**
+   * This object is used locally (in-memory) by the {@link 
MultiActiveLeaseArbiter} to identify a particular
+   * {@link DagAction} along with the time it was requested, denoted by the 
`eventTimeMillis` field. It also tracks
+   * whether it has been previously passed to the {@link 
MultiActiveLeaseArbiter} to attempt ownership over the flow
+   * event, indicated by the 'isReminder' field (true when it has been 
previously attempted).
+   */
   @Data
   @RequiredArgsConstructor
-  class DagActionLeaseObject {
+  class LeaseParams {
     final DagAction dagAction;
     final boolean isReminder;
     final long eventTimeMillis;
@@ -88,10 +110,15 @@ public interface DagActionStore {
     /**
      * Creates a lease object for a dagAction and eventTimeMillis representing 
an original event (isReminder is False)
      */
-    public DagActionLeaseObject(DagAction dagAction, long eventTimeMillis) {
-      this.dagAction = dagAction;
-      this.isReminder = false;
-      this.eventTimeMillis = eventTimeMillis;
+    public LeaseParams(DagAction dagAction, long eventTimeMillis) {
+      this(dagAction, false, eventTimeMillis);
+    }
+
+    /**
+     * Replace flow execution id in dagAction with agreed upon event time to 
easily track the flow
+     */
+    public DagAction updateDagActionFlowExecutionId(long flowExecutionId) {
+      return this.dagAction.updateFlowExecutionId(flowExecutionId);
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index eea645284..caa5e96ac 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -35,5 +35,5 @@ public interface DagManagement {
   /**
    * Used to add reminder dagActions to the queue that already contain an 
eventTimestamp from the previous lease attempt
    */
-  void addReminderDagAction(DagActionStore.DagActionLeaseObject 
reminderDagActionLeaseObject) throws IOException;
+  void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams) 
throws IOException;
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index b6901f58d..dc8967990 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -80,7 +80,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
   private final boolean isMultiActiveExecutionEnabled;
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
-  private final BlockingQueue<DagActionStore.DagActionLeaseObject> 
dagActionLeaseObjectQueue = new LinkedBlockingQueue<>();
+  private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue = 
new LinkedBlockingQueue<>();
   private final DagManagementStateStore dagManagementStateStore;
 
   @Inject
@@ -112,18 +112,18 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     // TODO: Used to track missing dag issue, remove later as needed
     log.info("Add original (non-reminder) dagAction {}", dagAction);
 
-    if (!this.dagActionLeaseObjectQueue.offer(new 
DagActionStore.DagActionLeaseObject(dagAction, false, 
System.currentTimeMillis()))) {
+    if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction, 
false, System.currentTimeMillis()))) {
       throw new RuntimeException(String.format("Could not add dag action to 
the queue %s", dagAction));
     }
   }
 
   @Override
-  public synchronized void 
addReminderDagAction(DagActionStore.DagActionLeaseObject 
reminderDagActionLeaseObject) {
+  public synchronized void addReminderDagAction(DagActionStore.LeaseParams 
reminderLeaseParams) {
     // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add reminder dagAction {}", reminderDagActionLeaseObject);
+    log.info("Add reminder dagAction {}", reminderLeaseParams);
 
-    if (!this.dagActionLeaseObjectQueue.offer(reminderDagActionLeaseObject)) {
-      throw new RuntimeException(String.format("Could not add reminder dag 
action to the queue %s", reminderDagActionLeaseObject));
+    if (!this.leaseParamsQueue.offer(reminderLeaseParams)) {
+      throw new RuntimeException(String.format("Could not add reminder dag 
action to the queue %s", reminderLeaseParams));
     }
   }
 
@@ -137,17 +137,17 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       while (true) {
         DagActionStore.DagAction dagAction = null;
         try {
-          DagActionStore.DagActionLeaseObject dagActionLeaseObject = 
this.dagActionLeaseObjectQueue.take();
-          dagAction = dagActionLeaseObject.getDagAction();
+          DagActionStore.LeaseParams leaseParams = 
this.leaseParamsQueue.take();
+          dagAction = leaseParams.getDagAction();
           /* Create triggers for original (non-reminder) dag actions of type 
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
              Reminder triggers are used to inform hosts once the job start 
deadline and flow finish deadline are passed;
              then only is lease arbitration done to enforce the deadline 
violation and fail the job or flow if needed */
-          if (!dagActionLeaseObject.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
-            createJobStartDeadlineTrigger(dagActionLeaseObject);
-          } else if (!dagActionLeaseObject.isReminder() && 
dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
-            createFlowFinishDeadlineTrigger(dagActionLeaseObject);
+          if (!leaseParams.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+            createJobStartDeadlineTrigger(leaseParams);
+          } else if (!leaseParams.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+            createFlowFinishDeadlineTrigger(leaseParams);
           } else { // Handle original non-deadline dagActions as well as 
reminder events of all types
-            LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagActionLeaseObject);
+            LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(leaseParams);
             if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
               return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
             }
@@ -159,22 +159,22 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       }
   }
 
-  private void 
createJobStartDeadlineTrigger(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject)
+  private void createJobStartDeadlineTrigger(DagActionStore.LeaseParams 
leaseParams)
       throws SchedulerException, IOException {
     long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
-        
dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+        leaseParams.getDagAction().getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
     // todo - this timestamp is just an approximation, the real job submission 
has happened in past, and that is when a
     // ENFORCE_JOB_START_DEADLINE dag action was created; we are just 
processing that dag action here
     long jobSubmissionTime = System.currentTimeMillis();
     long reminderDuration = jobSubmissionTime + timeOutForJobStart - 
System.currentTimeMillis();
 
-    dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject, 
reminderDuration, true);
+    dagActionReminderScheduler.get().scheduleReminder(leaseParams, 
reminderDuration, true);
   }
 
-  private void 
createFlowFinishDeadlineTrigger(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject)
+  private void createFlowFinishDeadlineTrigger(DagActionStore.LeaseParams 
leaseParams)
       throws SchedulerException, IOException {
     long timeOutForJobFinish;
-    Dag.DagNode<JobExecutionPlan> dagNode = 
this.dagManagementStateStore.getDag(dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode = 
this.dagManagementStateStore.getDag(leaseParams.getDagAction().getDagId()).get().getNodes().get(0);
 
     try {
       timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
@@ -189,23 +189,23 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
     long reminderDuration = flowStartTime + timeOutForJobFinish - 
System.currentTimeMillis();
 
-    dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject, 
reminderDuration, true);
+    dagActionReminderScheduler.get().scheduleReminder(leaseParams, 
reminderDuration, true);
   }
 
   /**
    * Returns a {@link LeaseAttemptStatus} associated with the
    * `dagAction` by calling
-   * {@link 
MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagActionLeaseObject, 
boolean)}.
-   * @param dagActionLeaseObject
+   * {@link 
MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.LeaseParams, boolean)}.
+   * @param leaseParams
    * @return
    * @throws IOException
    * @throws SchedulerException
    */
-  private LeaseAttemptStatus 
retrieveLeaseStatus(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
+  private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.LeaseParams 
leaseParams)
       throws IOException, SchedulerException {
     // Uses reminder flag to determine whether to use current time as event 
time or previously saved event time
     LeaseAttemptStatus leaseAttemptStatus = 
this.dagActionProcessingLeaseArbiter
-        .tryAcquireLease(dagActionLeaseObject, false);
+        .tryAcquireLease(leaseParams, false);
         /* Schedule a reminder for the event unless the lease has been 
completed to safeguard against the case where
         even we, when we might become the lease owner still fail to complete 
processing
         */
@@ -242,7 +242,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   */
   protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
       throws SchedulerException {
-    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagActionLeaseObject(),
+    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusLeaseParams(),
         leaseStatus.getMinimumLingerDurationMillis(), false);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index e548f16ad..8051db483 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -106,11 +106,10 @@ public class FlowLaunchHandler {
    * the status of the attempt.
    */
   public void handleFlowLaunchTriggerEvent(Properties jobProps,
-      DagActionStore.DagActionLeaseObject dagActionLeaseObject, boolean 
adoptConsensusFlowExecutionId)
+      DagActionStore.LeaseParams leaseParams, boolean 
adoptConsensusFlowExecutionId)
       throws IOException {
-    long previousEventTimeMillis = dagActionLeaseObject.getEventTimeMillis();
-    LeaseAttemptStatus leaseAttempt = 
this.multiActiveLeaseArbiter.tryAcquireLease(
-        dagActionLeaseObject, adoptConsensusFlowExecutionId);
+    long previousEventTimeMillis = leaseParams.getEventTimeMillis();
+    LeaseAttemptStatus leaseAttempt = 
this.multiActiveLeaseArbiter.tryAcquireLease(leaseParams, 
adoptConsensusFlowExecutionId);
     if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
         && persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt)) {
       log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", 
leaseAttempt.getConsensusDagAction(),
@@ -129,7 +128,7 @@ public class FlowLaunchHandler {
       return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) 
leaseAttempt);
     } else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) 
{ // remind w/o delay to immediately re-attempt handling
       return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(
-          ((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt).getConsensusDagActionLeaseObject(), 0L));
+          ((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt).getConsensusLeaseParams(), 0L));
     } else {
       throw new RuntimeException("unexpected `LeaseAttemptStatus` derived 
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index ab8a9ee14..1c7ae2993 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -72,14 +72,14 @@ public class InstrumentedLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject, 
boolean skipFlowExecutionIdReplacement) throws IOException {
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams 
leaseParams, boolean skipFlowExecutionIdReplacement) throws IOException {
     LeaseAttemptStatus leaseAttemptStatus =
-        decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagActionLeaseObject, 
skipFlowExecutionIdReplacement);
-    log.info("Multi-active scheduler lease attempt for leaseObject: {} 
received type of leaseAttemptStatus: [{}, "
-            + "eventTimestamp: {}] ", dagActionLeaseObject, 
leaseAttemptStatus.getClass().getName(),
-        dagActionLeaseObject.getEventTimeMillis());
+        decoratedMultiActiveLeaseArbiter.tryAcquireLease(leaseParams, 
skipFlowExecutionIdReplacement);
+    log.info("Multi-active arbiter attempt for: {} received type of 
leaseAttemptStatus: [{}, "
+            + "eventTimestamp: {}] ", leaseParams, 
leaseAttemptStatus.getClass().getName(),
+        leaseParams.getEventTimeMillis());
     if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
-      if (dagActionLeaseObject.isReminder()) {
+      if (leaseParams.isReminder()) {
         this.leasesObtainedDueToReminderCount.mark();
       }
       this.leaseObtainedCount.inc();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index 3584f445f..4dd16457e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -34,10 +34,10 @@ import lombok.Getter;
  */
 public abstract class LeaseAttemptStatus {
   /**
-   * @return the {@link DagActionStore.DagActionLeaseObject}, containing the 
dagAction, eventTimeMillis of the event, and boolean
+   * @return the {@link DagActionStore.LeaseParams}, containing the dagAction, 
eventTimeMillis of the event, and boolean
    * indicating if it's a reminder event; {@see 
MultiActiveLeaseArbiter#tryAcquireLease}
    */
-  public DagActionStore.DagActionLeaseObject 
getConsensusDagActionLeaseObject() {
+  public DagActionStore.LeaseParams getConsensusLeaseParams() {
     return null;
   }
 
@@ -73,7 +73,7 @@ public abstract class LeaseAttemptStatus {
   // avoid - warning: Generating equals/hashCode implementation but without a 
call to superclass, even though this class does not extend java.lang.Object
   @EqualsAndHashCode(callSuper=false)
   public static class LeaseObtainedStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagActionLeaseObject 
consensusDagActionLeaseObject;
+    private final DagActionStore.LeaseParams consensusLeaseParams;
     private final long leaseAcquisitionTimestamp;
     private final long minimumLingerDurationMillis;
     @Getter(AccessLevel.NONE)
@@ -81,7 +81,7 @@ public abstract class LeaseAttemptStatus {
 
     @Override
     public DagActionStore.DagAction getConsensusDagAction() {
-      return consensusDagActionLeaseObject.getDagAction();
+      return consensusLeaseParams.getDagAction();
     }
 
     /**
@@ -94,7 +94,7 @@ public abstract class LeaseAttemptStatus {
     }
 
     public long getEventTimeMillis() {
-      return consensusDagActionLeaseObject.getEventTimeMillis();
+      return consensusLeaseParams.getEventTimeMillis();
     }
   }
 
@@ -109,16 +109,16 @@ public abstract class LeaseAttemptStatus {
   // avoid - warning: Generating equals/hashCode implementation but without a 
call to superclass, even though this class does not extend java.lang.Object
   @EqualsAndHashCode(callSuper=false)
   public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagActionLeaseObject 
consensusDagActionLeaseObject;
+    private final DagActionStore.LeaseParams consensusLeaseParams;
     private final long minimumLingerDurationMillis;
 
     @Override
     public DagActionStore.DagAction getConsensusDagAction() {
-      return consensusDagActionLeaseObject.getDagAction();
+      return consensusLeaseParams.getDagAction();
     }
 
     public long getEventTimeMillis() {
-      return consensusDagActionLeaseObject.getEventTimeMillis();
+      return consensusLeaseParams.getEventTimeMillis();
     }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index b49d2ae52..5965a6d07 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -49,7 +49,7 @@ public interface MultiActiveLeaseArbiter {
    * added by the previous write). Based on the transaction results, it will 
return {@link LeaseAttemptStatus} to
    * determine the next action.
    *
-   * @param dagActionLeaseObject                   uniquely identifies the 
flow, the present action upon it, the time the action
+   * @param leaseParams                   uniquely identifies the flow, the 
present action upon it, the time the action
    *                                      was triggered, and if the dag action 
event we're checking on is a reminder event
    * @param adoptConsensusFlowExecutionId if true then replaces the dagAction 
flowExecutionId returned in
    *                                      LeaseAttemptStatuses with the 
consensus eventTime, accessed via
@@ -58,7 +58,7 @@ public interface MultiActiveLeaseArbiter {
    * {@link DagActionStore.DagAction} with a possibly updated ("laundered") 
flow execution id that MUST be used thereafter
    * @throws IOException
    */
-  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject, boolean adoptConsensusFlowExecutionId)
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 33274e425..7aa3c42b9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -248,21 +248,22 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject, 
boolean adoptConsensusFlowExecutionId) throws IOException {
-    log.info("Multi-active arbiter about to handle trigger event: {}", 
dagActionLeaseObject);
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams 
leaseParams,
+      boolean adoptConsensusFlowExecutionId) throws IOException {
+    log.info("Multi-active arbiter about to handle trigger event: {}", 
leaseParams);
     // Query lease arbiter table about this dag action
-    Optional<GetEventInfoResult> getResult = 
getExistingEventInfo(dagActionLeaseObject);
+    Optional<GetEventInfoResult> getResult = getExistingEventInfo(leaseParams);
 
     try {
       if (!getResult.isPresent()) {
         log.debug("tryAcquireLease for {} - CASE 1: no existing row for this 
dag action, then go ahead and insert",
-            dagActionLeaseObject);
-        int numRowsUpdated = 
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction(),
+            leaseParams);
+        int numRowsUpdated = attemptLeaseIfNewRow(leaseParams.getDagAction(),
             ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
                 .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random() 
* DELAY_FOR_RETRY_RANGE_MILLIS)
                 .build());
-       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, 
dagActionLeaseObject.getDagAction(),
-           Optional.empty(), dagActionLeaseObject.isReminder(), 
adoptConsensusFlowExecutionId);
+       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, 
Optional.empty(),
+           adoptConsensusFlowExecutionId);
       }
 
       // Extract values from result set
@@ -276,30 +277,30 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
 
       // For reminder event, we can stop early if the reminder eventTimeMillis 
is older than the current event in the db
       // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
-      if (dagActionLeaseObject.isReminder()) {
-        if (dagActionLeaseObject.getEventTimeMillis() < 
dbEventTimestamp.getTime()) {
+      if (leaseParams.isReminder()) {
+        if (leaseParams.getEventTimeMillis() < dbEventTimestamp.getTime()) {
           log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new 
event trigger "
-                  + "is being worked on, so this older reminder will be 
dropped.", dagActionLeaseObject,
+                  + "is being worked on, so this older reminder will be 
dropped.", leaseParams,
               dbEventTimestamp);
           return new LeaseAttemptStatus.NoLongerLeasingStatus();
         }
-        if (dagActionLeaseObject.getEventTimeMillis() > 
dbEventTimestamp.getTime()) {
+        if (leaseParams.getEventTimeMillis() > dbEventTimestamp.getTime()) {
           // TODO: emit metric here to capture this unexpected behavior
           log.warn("tryAcquireLease for {} - dbEventTimeMillis: {} - Severe 
constraint "
                   + "violation encountered: a reminder event newer than db 
event was found when db laundering should "
-                  + "ensure monotonically increasing laundered event times.", 
dagActionLeaseObject,
+                  + "ensure monotonically increasing laundered event times.", 
leaseParams,
               dbEventTimestamp.getTime());
         }
-        if (dagActionLeaseObject.getEventTimeMillis() == 
dbEventTimestamp.getTime()) {
+        if (leaseParams.getEventTimeMillis() == dbEventTimestamp.getTime()) {
           log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - Reminder 
event time "
-                  + "is the same as db event.", dagActionLeaseObject, 
dbEventTimestamp);
+                  + "is the same as db event.", leaseParams, dbEventTimestamp);
         }
       }
 
       // TODO: check whether reminder event before replacing flowExecutionId
       if (adoptConsensusFlowExecutionId) {
         log.info("Multi-active arbiter replacing local trigger event timestamp 
{} with database eventTimestamp {} (in "
-                + "epoch-millis)", dagActionLeaseObject, 
dbCurrentTimestamp.getTime());
+                + "epoch-millis)", leaseParams, dbCurrentTimestamp.getTime());
       }
       /* Note that we use `adoptConsensusFlowExecutionId` parameter's value to 
determine whether we should use the db
       laundered event timestamp as the flowExecutionId or maintain the 
original one
@@ -309,54 +310,54 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       if (leaseValidityStatus == 1) {
         if (isWithinEpsilon) {
          DagActionStore.DagAction updatedDagAction =
-              adoptConsensusFlowExecutionId ? 
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbEventTimestamp.getTime())
 : dagActionLeaseObject.getDagAction();
-          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
-              updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
+              adoptConsensusFlowExecutionId ? 
leaseParams.updateDagActionFlowExecutionId(dbEventTimestamp.getTime()) : 
leaseParams.getDagAction();
+         DagActionStore.LeaseParams updatedLeaseParams = new 
DagActionStore.LeaseParams(updatedDagAction,
+             dbEventTimestamp.getTime());
+          log.debug("tryAcquireLease for [{}] - CASE 2: Same event, lease is 
valid", updatedLeaseParams);
           // Utilize db timestamp for reminder
-          return new LeaseAttemptStatus.LeasedToAnotherStatus(
-              new DagActionStore.DagActionLeaseObject(updatedDagAction, 
dbEventTimestamp.getTime()),
+          return new 
LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
dbCurrentTimestamp.getTime());
         }
         DagActionStore.DagAction updatedDagAction =
-            adoptConsensusFlowExecutionId ? 
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime())
 : dagActionLeaseObject.getDagAction();
-        log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
3: Distinct event, lease is valid",
-            updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
+            adoptConsensusFlowExecutionId ? 
leaseParams.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime()) 
: leaseParams.getDagAction();
+        DagActionStore.LeaseParams updatedLeaseParams = new 
DagActionStore.LeaseParams(updatedDagAction,
+            dbCurrentTimestamp.getTime());
+        log.debug("tryAcquireLease for [{}] - CASE 3: Distinct event, lease is 
valid", updatedLeaseParams);
         // Utilize db lease acquisition timestamp for wait time and 
currentTimestamp as the new eventTimestamp
-        return new LeaseAttemptStatus.LeasedToAnotherStatus(
-            new DagActionStore.DagActionLeaseObject(updatedDagAction, 
dbCurrentTimestamp.getTime()),
+        return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
             dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
dbCurrentTimestamp.getTime());
       } // Lease is invalid
       else if (leaseValidityStatus == 2) {
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
4: Lease is out of date (regardless of "
-            + "whether same or distinct event)", 
dagActionLeaseObject.getDagAction(),
-            dagActionLeaseObject.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
-        if (isWithinEpsilon && !dagActionLeaseObject.isReminder) {
+            + "whether same or distinct event)", leaseParams.getDagAction(),
+            leaseParams.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
+        if (isWithinEpsilon && !leaseParams.isReminder) {
           log.warn("Lease should not be out of date for the same trigger event 
since epsilon << linger for "
                   + "leaseObject.getDagAction() {}, db eventTimestamp {}, db 
leaseAcquisitionTimestamp {}, linger {}",
-              dagActionLeaseObject.getDagAction(), dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, dbLinger);
+              leaseParams.getDagAction(), dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, dbLinger);
         }
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
-            dagActionLeaseObject.getDagAction(), true,true, dbEventTimestamp,
+            leaseParams.getDagAction(), true,true, dbEventTimestamp,
             dbLeaseAcquisitionTimestamp);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, 
dagActionLeaseObject.getDagAction(),
-            Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder, 
adoptConsensusFlowExecutionId);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, 
Optional.of(dbCurrentTimestamp),
+            adoptConsensusFlowExecutionId);
       } // No longer leasing this event
         if (isWithinEpsilon) {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 5: Same event, no longer leasing event"
-              + " in db", dagActionLeaseObject.getDagAction(),
-              dagActionLeaseObject.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
+              + " in db", leaseParams.getDagAction(),
+              leaseParams.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
           return new LeaseAttemptStatus.NoLongerLeasingStatus();
         }
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
6: Distinct event, no longer leasing "
-            + "event in db", dagActionLeaseObject.getDagAction(),
-            dagActionLeaseObject.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
+            + "event in db", leaseParams.getDagAction(),
+            leaseParams.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement,
-            dagActionLeaseObject.getDagAction(), true, false, dbEventTimestamp,
+            leaseParams.getDagAction(), true, false, dbEventTimestamp,
             null);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, 
dagActionLeaseObject.getDagAction(),
-            Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder, 
adoptConsensusFlowExecutionId);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, 
Optional.of(dbCurrentTimestamp),
+            adoptConsensusFlowExecutionId);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -365,19 +366,20 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   /**
    * Checks leaseArbiterTable for an existing entry for this dag action and 
event time
    */
-  protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
+  protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.LeaseParams leaseParams)
       throws IOException {
+    DagActionStore.DagAction dagAction = leaseParams.getDagAction();
     return dbStatementExecutor.withPreparedStatement(
-        dagActionLeaseObject.isReminder ? thisTableGetInfoStatementForReminder 
: thisTableGetInfoStatement,
+        leaseParams.isReminder ? thisTableGetInfoStatementForReminder : 
thisTableGetInfoStatement,
         getInfoStatement -> {
           int i = 0;
-          if (dagActionLeaseObject.isReminder) {
-            getInfoStatement.setTimestamp(++i, new 
Timestamp(dagActionLeaseObject.getEventTimeMillis()), UTC_CAL.get());
+          if (leaseParams.isReminder) {
+            getInfoStatement.setTimestamp(++i, new 
Timestamp(leaseParams.getEventTimeMillis()), UTC_CAL.get());
           }
-          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getFlowGroup());
-          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getFlowName());
-          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getJobName());
-          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getDagActionType().toString());
+          getInfoStatement.setString(++i, dagAction.getFlowGroup());
+          getInfoStatement.setString(++i, dagAction.getFlowName());
+          getInfoStatement.setString(++i, dagAction.getJobName());
+          getInfoStatement.setString(++i, 
dagAction.getDagActionType().toString());
           ResultSet resultSet = getInfoStatement.executeQuery();
           try {
             if (!resultSet.next()) {
@@ -520,33 +522,32 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @throws IOException
    */
   protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int 
numRowsUpdated,
-     DagActionStore.DagAction dagAction, Optional<Timestamp> 
dbCurrentTimestamp, boolean isReminderEvent,
-      boolean adoptConsensusFlowExecutionId)
+      DagActionStore.LeaseParams leaseParams, Optional<Timestamp> 
dbCurrentTimestamp, boolean adoptConsensusFlowExecutionId)
       throws SQLException, IOException {
     // Fetch values in row after attempted insert
-    SelectInfoResult selectInfoResult = getRowInfo(dagAction);
+    SelectInfoResult selectInfoResult = getRowInfo(leaseParams.dagAction);
     // Another participant won the lease in between
     if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
       return new LeaseAttemptStatus.NoLongerLeasingStatus();
     }
    DagActionStore.DagAction updatedDagAction =
-        adoptConsensusFlowExecutionId ? 
dagAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : dagAction;
-    DagActionStore.DagActionLeaseObject consensusDagActionLeaseObject =
-        new DagActionStore.DagActionLeaseObject(updatedDagAction, 
selectInfoResult.getEventTimeMillis());
+        adoptConsensusFlowExecutionId ? 
leaseParams.updateDagActionFlowExecutionId(selectInfoResult.eventTimeMillis) : 
leaseParams.dagAction;
+    DagActionStore.LeaseParams consensusLeaseParams =
+        new DagActionStore.LeaseParams(updatedDagAction, 
selectInfoResult.getEventTimeMillis());
     // If no db current timestamp is present, then use the full db linger 
value for duration
     long minimumLingerDurationMillis = dbCurrentTimestamp.isPresent() ?
         selectInfoResult.getLeaseAcquisitionTimeMillis().get() + 
selectInfoResult.getDbLinger()
             - dbCurrentTimestamp.get().getTime() : 
selectInfoResult.getDbLinger();
     if (numRowsUpdated == 1) {
       log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] 
successfully!", updatedDagAction,
-          isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis);
-      return new 
LeaseAttemptStatus.LeaseObtainedStatus(consensusDagActionLeaseObject,
+          leaseParams.isReminder() ? "reminder" : "original", 
selectInfoResult.eventTimeMillis);
+      return new LeaseAttemptStatus.LeaseObtainedStatus(consensusLeaseParams,
           selectInfoResult.getLeaseAcquisitionTimeMillis().get(), 
minimumLingerDurationMillis, this);
     }
     log.info("Another participant acquired lease in between for [{}, is: {}, 
eventTimestamp: {}] - num rows updated: {}",
-        updatedDagAction, isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis, numRowsUpdated);
+        updatedDagAction, leaseParams.isReminder ? "reminder" : "original", 
selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
-    return new 
LeaseAttemptStatus.LeasedToAnotherStatus(consensusDagActionLeaseObject, 
minimumLingerDurationMillis);
+    return new LeaseAttemptStatus.LeasedToAnotherStatus(consensusLeaseParams, 
minimumLingerDurationMillis);
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 49793cfcf..8ff5e2daf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -221,8 +221,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
             flowName,
             FlowUtils.getOrCreateFlowExecutionId(flowSpec),
             DagActionStore.DagActionType.LAUNCH);
-        DagActionStore.DagActionLeaseObject
-            leaseObject = new 
DagActionStore.DagActionLeaseObject(launchDagAction, isReminderEvent,
+        DagActionStore.LeaseParams
+            leaseObject = new DagActionStore.LeaseParams(launchDagAction, 
isReminderEvent,
             triggerTimestampMillis);
         // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as 
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
         flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, 
leaseObject, flowSpec.isScheduled());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index b2c1ff9a7..4754a5e2c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -63,7 +63,7 @@ public class DagActionReminderSchedulerTest {
   @Test
   public void testCreateReminderJobDetail() {
     long expectedEventTimeMillis = 55L;
-    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(new 
DagActionStore.DagActionLeaseObject(launchDagAction, false, 
expectedEventTimeMillis), false);
+    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(new 
DagActionStore.LeaseParams(launchDagAction, false, expectedEventTimeMillis), 
false);
     Assert.assertEquals(jobDetail.getKey().toString(), 
DagActionReminderScheduler.RetryReminderKeyGroup + "." + expectedKey);
     JobDataMap dataMap = jobDetail.getJobDataMap();
     Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY), 
flowGroup);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index fcea36d7f..a5167ca85 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -97,10 +97,10 @@ public class DagManagementTaskStreamImplTest {
     dagManagementTaskStream.addDagAction(launchAction);
     dagManagementTaskStream.addDagAction(launchAction);
     when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
-        .tryAcquireLease(any(DagActionStore.DagActionLeaseObject.class), 
anyBoolean()))
+        .tryAcquireLease(any(DagActionStore.LeaseParams.class), anyBoolean()))
         .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
-            new LeaseAttemptStatus.LeasedToAnotherStatus(new 
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 15),
-            new LeaseAttemptStatus.LeaseObtainedStatus(new 
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 0, 5, null));
+            new LeaseAttemptStatus.LeasedToAnotherStatus(new 
DagActionStore.LeaseParams(launchAction, true, 1), 15),
+            new LeaseAttemptStatus.LeaseObtainedStatus(new 
DagActionStore.LeaseParams(launchAction, true, 1), 0, 5, null));
     DagTask dagTask = dagManagementTaskStream.next();
     Assert.assertTrue(dagTask instanceof LaunchDagTask);
     DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 88361e563..47b7c4226 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -37,8 +37,8 @@ public class FlowLaunchHandlerTest {
   int schedulerBackOffMillis = 10;
   DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction("flowName", "flowGroup",
       flowExecutionId, "jobName", DagActionStore.DagActionType.LAUNCH);
-  DagActionStore.DagActionLeaseObject
-      leaseObject = new DagActionStore.DagActionLeaseObject(dagAction, false, 
eventToRevisit);
+  DagActionStore.LeaseParams
+      leaseObject = new DagActionStore.LeaseParams(dagAction, false, 
eventToRevisit);
   LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
       new LeaseAttemptStatus.LeasedToAnotherStatus(leaseObject, 
minimumLingerDurationMillis);
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 0f97a290d..297148c96 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -58,16 +58,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
   // Dag actions with the same flow info but different flow action types are 
considered unique
   private static DagActionStore.DagAction launchDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
-  private static DagActionStore.DagActionLeaseObject
-      launchLeaseObject = new 
DagActionStore.DagActionLeaseObject(launchDagAction, false, eventTimeMillis);
+  private static DagActionStore.LeaseParams
+      launchLeaseParams = new DagActionStore.LeaseParams(launchDagAction, 
false, eventTimeMillis);
   private static DagActionStore.DagAction resumeDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.RESUME);
-  private static DagActionStore.DagActionLeaseObject
-      resumeLeaseObject = new 
DagActionStore.DagActionLeaseObject(resumeDagAction, false, eventTimeMillis);
+  private static DagActionStore.LeaseParams
+      resumeLeaseParams = new DagActionStore.LeaseParams(resumeDagAction, 
false, eventTimeMillis);
   private static DagActionStore.DagAction launchDagAction2 =
       new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
-  private static DagActionStore.DagActionLeaseObject
-      launchLeaseObject2 = new 
DagActionStore.DagActionLeaseObject(launchDagAction2, false, eventTimeMillis);
+  private static DagActionStore.LeaseParams
+      launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2, 
false, eventTimeMillis);
   private static final Timestamp dummyTimestamp = new Timestamp(99999);
   private ITestMetastoreDatabase testDb;
   private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -111,7 +111,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   public void testAcquireLeaseSingleParticipant() throws Exception {
     // Tests CASE 1 of acquire lease for a flow action event not present in DB
     LeaseAttemptStatus firstLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
     Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
@@ -124,14 +124,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
         new DagActionStore.DagAction(flowGroup, flowName, 
consensusEventTimeMillis, jobName,
             DagActionStore.DagActionType.LAUNCH)));
     Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
consensusEventTimeMillis);
-    
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder,
 false);
+    
Assert.assertEquals(firstObtainedStatus.getConsensusLeaseParams().isReminder, 
false);
 
     // Verify that different DagAction types for the same flow can have leases 
at the same time
     DagActionStore.DagAction killDagAction = new
         DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.KILL);
     LeaseAttemptStatus killStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(
-            new DagActionStore.DagActionLeaseObject(killDagAction, false, 
eventTimeMillis), true);
+            new DagActionStore.LeaseParams(killDagAction, false, 
eventTimeMillis), true);
     Assert.assertTrue(killStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus killObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) killStatus;
@@ -142,7 +142,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Very little time should have passed if this test directly follows the 
one above so this call will be considered
     // the same as the previous event
     LeaseAttemptStatus secondLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
     Assert.assertTrue(secondLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
@@ -154,7 +154,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Allow enough time to pass for this trigger to be considered distinct, 
but not enough time so the lease expires
     Thread.sleep(MORE_THAN_EPSILON);
     LeaseAttemptStatus thirdLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
     Assert.assertTrue(thirdLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -164,7 +164,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Tests CASE 4 of lease out of date
     Thread.sleep(LINGER);
     LeaseAttemptStatus fourthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
     Assert.assertTrue(fourthLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus fourthObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) fourthLaunchStatus;
@@ -177,14 +177,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
     Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
     LeaseAttemptStatus fifthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
     Assert.assertTrue(fifthLaunchStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
     Thread.sleep(MORE_THAN_EPSILON);
     LeaseAttemptStatus sixthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
     Assert.assertTrue(sixthLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus sixthObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) sixthLaunchStatus;
@@ -251,10 +251,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
   @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfMatchingAllColsStatement")
   public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
       throws IOException, SQLException {
-    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
completeLeaseHelper(resumeLeaseObject);
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
completeLeaseHelper(resumeLeaseParams);
     // Ensure no NPE results from calling this after a lease has been 
completed and acquisition timestamp val is NULL
-    mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeDagAction,
-        Optional.empty(), false, true);
+    mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeLeaseParams,
+        Optional.empty(), true);
 
     // The following insert will fail since eventTimestamp does not match the 
expected
     int numRowsUpdated = 
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
@@ -274,16 +274,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
    */
   @Test
   public void testOlderReminderEventAcquireLease() throws IOException {
-    DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
-    mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+    DagActionStore.LeaseParams newLaunchLeaseParams = 
getUniqueLaunchLeaseParams();
+    mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
     // Read database to obtain existing db eventTimeMillis and use it to 
construct an older event
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
+        
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseParams.getDagAction());
     long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
-    DagActionStore.DagActionLeaseObject updatedLeaseObject =
-        new 
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true, 
olderEventTimestamp);
+    DagActionStore.LeaseParams updatedLeaseParams =
+        new DagActionStore.LeaseParams(newLaunchLeaseParams.getDagAction(), 
true, olderEventTimestamp);
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseParams, true);
     Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
   }
 
@@ -294,15 +294,15 @@ public class MysqlMultiActiveLeaseArbiterTest {
    */
   @Test
   public void testReminderEventAcquireLeaseOnValidLease() throws IOException {
-    DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
+    DagActionStore.LeaseParams newLaunchLeaseParams = 
getUniqueLaunchLeaseParams();
     LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
-        (LeaseAttemptStatus.LeaseObtainedStatus) 
mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
-    // Use the consensusLeaseObject containing the new eventTimeMillis for the 
reminder event time
-    DagActionStore.DagActionLeaseObject updatedLeaseObject =
-        new 
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true,
+        (LeaseAttemptStatus.LeaseObtainedStatus) 
mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
+    // Use the consensusLeaseParams containing the new eventTimeMillis for the 
reminder event time
+    DagActionStore.LeaseParams updatedLeaseParams =
+        new DagActionStore.LeaseParams(newLaunchLeaseParams.getDagAction(), 
true,
             leaseObtainedStatus.getEventTimeMillis());
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseParams, true);
     Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus = 
(LeaseAttemptStatus.LeasedToAnotherStatus) attemptStatus;
     Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(), 
leaseObtainedStatus.getEventTimeMillis());
@@ -314,13 +314,13 @@ public class MysqlMultiActiveLeaseArbiterTest {
    */
   @Test
   public void testReminderEventAcquireLeaseOnInvalidLease() throws 
IOException, InterruptedException {
-    DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
-    mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
-    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
+    DagActionStore.LeaseParams newLaunchLeaseParams = 
getUniqueLaunchLeaseParams();
+    mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseParams.getDagAction());
     // Wait enough time for the lease to expire
     Thread.sleep(MORE_THAN_LINGER);
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, 
true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, 
true);
     Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus obtainedStatus = 
(LeaseAttemptStatus.LeaseObtainedStatus) attemptStatus;
     Assert.assertTrue(obtainedStatus.getEventTimeMillis() > 
selectInfoResult.getEventTimeMillis());
@@ -336,19 +336,19 @@ public class MysqlMultiActiveLeaseArbiterTest {
    @Test
    public void testReminderEventAcquireLeaseOnCompletedLease() throws 
IOException, InterruptedException {
      // Create a new dag action and complete the lease
-     DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
-     mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
-     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
completeLeaseHelper(newLaunchLeaseObject);
+     DagActionStore.LeaseParams newLaunchLeaseParams = 
getUniqueLaunchLeaseParams();
+     mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
+     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
completeLeaseHelper(newLaunchLeaseParams);
 
      // Sleep enough time for the event to have been considered distinct
      Thread.sleep(MORE_THAN_EPSILON);
      // Now have a reminder event check-in on the completed lease
-     DagActionStore.DagAction updatedDagAction = 
newLaunchLeaseObject.getDagAction().updateFlowExecutionId(
+     DagActionStore.DagAction updatedDagAction = 
newLaunchLeaseParams.getDagAction().updateFlowExecutionId(
          selectInfoResult.getEventTimeMillis());
-     DagActionStore.DagActionLeaseObject updatedLeaseObject =
-         new DagActionStore.DagActionLeaseObject(updatedDagAction, true, 
newLaunchLeaseObject.getEventTimeMillis());
+     DagActionStore.LeaseParams updatedLeaseParams =
+         new DagActionStore.LeaseParams(updatedDagAction, true, 
newLaunchLeaseParams.getEventTimeMillis());
      LeaseAttemptStatus attemptStatus =
-         mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, 
true);
+         mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseParams, 
true);
      Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
    }
 
@@ -362,7 +362,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
     // Obtain a lease for a new action and verify its flowExecutionId is not 
updated
     LeaseAttemptStatus firstLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2, 
false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams2, 
false);
     Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
@@ -370,19 +370,19 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() != 
Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId()));
     Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
-    
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
 false);
+    
Assert.assertEquals(firstObtainedStatus.getConsensusLeaseParams().isReminder(), 
false);
 
     // A second attempt to obtain a lease on the same action should return a 
LeasedToAnotherStatus which also contains
     // the original flowExecutionId and the same event time from the previous 
LeaseAttemptStatus
     LeaseAttemptStatus secondLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2, 
false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams2, 
false);
     Assert.assertTrue(secondLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
     Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
     
Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
-    
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
 false);
+    
Assert.assertEquals(firstObtainedStatus.getConsensusLeaseParams().isReminder(), 
false);
 
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus));
   }
@@ -402,10 +402,11 @@ public class MysqlMultiActiveLeaseArbiterTest {
   }
 
   /**
-   * Returns a unique launch type leaseObject using 
#getUniqueLaunchDagAction() to create a unique flowName
+   * Returns a unique launch type {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.LeaseParams}
+   * using #getUniqueLaunchDagAction() to create a unique flowName
    */
-  public DagActionStore.DagActionLeaseObject getUniqueLaunchLeaseObject() {
-    return new DagActionStore.DagActionLeaseObject(getUniqueLaunchDagAction(), 
false, eventTimeMillis);
+  public DagActionStore.LeaseParams getUniqueLaunchLeaseParams() {
+    return new DagActionStore.LeaseParams(getUniqueLaunchDagAction(), false, 
eventTimeMillis);
   }
 
   /**
@@ -413,15 +414,15 @@ public class MysqlMultiActiveLeaseArbiterTest {
    * @return SelectInfoResult object containing the event information used to 
complete the lease
    */
   public MysqlMultiActiveLeaseArbiter.SelectInfoResult completeLeaseHelper(
-      DagActionStore.DagActionLeaseObject previouslyLeasedLeaseObj) throws 
IOException {
+      DagActionStore.LeaseParams previouslyLeasedLeaseParams) throws 
IOException {
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        
mysqlMultiActiveLeaseArbiter.getRowInfo(previouslyLeasedLeaseObj.getDagAction());
-    DagActionStore.DagAction updatedDagAction = 
previouslyLeasedLeaseObj.getDagAction().updateFlowExecutionId(
+        
mysqlMultiActiveLeaseArbiter.getRowInfo(previouslyLeasedLeaseParams.getDagAction());
+    DagActionStore.DagAction updatedDagAction = 
previouslyLeasedLeaseParams.getDagAction().updateFlowExecutionId(
         selectInfoResult.getEventTimeMillis());
-    DagActionStore.DagActionLeaseObject
-        updatedLeaseObject = new 
DagActionStore.DagActionLeaseObject(updatedDagAction, false, 
selectInfoResult.getEventTimeMillis());
+    DagActionStore.LeaseParams
+        updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction, 
false, selectInfoResult.getEventTimeMillis());
     boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
-        updatedLeaseObject, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+        updatedLeaseParams, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
     Assert.assertTrue(markedSuccess);
     return selectInfoResult;
   }

Reply via email to