This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c064604e4 [GOBBLIN-2073] Use previous eventTime for lease arbitration 
of reminder dagActions (#3952)
c064604e4 is described below

commit c064604e4b804c9528a0b7365283b994acd071fc
Author: umustafi <[email protected]>
AuthorDate: Mon Jun 17 12:16:22 2024 -0700

    [GOBBLIN-2073] Use previous eventTime for lease arbitration of reminder 
dagActions (#3952)
    
    * Use previous eventTime for lease arbitration of reminder dagActions
    * Move isReminder and eventTimestamp into dagAction
    * Fix existing tests and build error
    * Decouple dependent MyMALA tests to prevent flakiness and allow 
debuggability
    * Creates a leaseObject to contain dagAction, isReminder, and 
eventTimeMillis
---
 .../orchestration/DagActionReminderScheduler.java  |  51 ++++---
 .../modules/orchestration/DagActionStore.java      |  23 ++-
 .../modules/orchestration/DagManagement.java       |   8 ++
 .../orchestration/DagManagementTaskStreamImpl.java |  71 +++++----
 .../modules/orchestration/FlowLaunchHandler.java   |  26 ++--
 .../orchestration/InstrumentedLeaseArbiter.java    |  14 +-
 .../modules/orchestration/LeaseAttemptStatus.java  |  47 ++++--
 .../orchestration/MultiActiveLeaseArbiter.java     |  10 +-
 .../MysqlMultiActiveLeaseArbiter.java              | 141 +++++++++---------
 .../modules/orchestration/Orchestrator.java        |  10 +-
 .../DagActionReminderSchedulerTest.java            |   4 +-
 .../DagManagementTaskStreamImplTest.java           |   7 +-
 .../orchestration/FlowLaunchHandlerTest.java       |   4 +-
 .../MysqlMultiActiveLeaseArbiterTest.java          | 160 ++++++++++++++-------
 14 files changed, 357 insertions(+), 219 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index add618616..b1bb84cd5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -26,6 +26,7 @@ import org.quartz.JobBuilder;
 import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
@@ -60,20 +61,20 @@ public class DagActionReminderScheduler {
   /**
    *  Uses a dagAction & reminder duration in milliseconds to create a 
reminder job that will fire
    *  `reminderDurationMillis` after the current time
-   * @param dagAction
+   * @param dagActionLeaseObject
    * @param reminderDurationMillis
    * @throws SchedulerException
    */
-  public void scheduleReminder(DagActionStore.DagAction dagAction, long 
reminderDurationMillis)
+  public void scheduleReminder(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject, long reminderDurationMillis)
       throws SchedulerException {
-    JobDetail jobDetail = createReminderJobDetail(dagAction);
-    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis, System::currentTimeMillis);
+    JobDetail jobDetail = createReminderJobDetail(dagActionLeaseObject);
+    Trigger trigger = 
createReminderJobTrigger(dagActionLeaseObject.getDagAction(), 
reminderDurationMillis,
+        System::currentTimeMillis);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
   public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws 
SchedulerException {
-    JobDetail jobDetail = createReminderJobDetail(dagAction);
-    quartzScheduler.deleteJob(jobDetail.getKey());
+    quartzScheduler.deleteJob(createJobKey(dagAction));
   }
 
   /**
@@ -84,6 +85,7 @@ public class DagActionReminderScheduler {
   @Slf4j
   public static class ReminderJob implements Job {
     public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+    public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
 
     @Override
     public void execute(JobExecutionContext context) {
@@ -94,17 +96,18 @@ public class DagActionReminderScheduler {
       String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
       long flowExecutionId = 
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
+      long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
 
-      log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
-          + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName 
+ ", dagActionType: " + dagActionType + ")");
-
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType, true);
+      DagActionStore.DagActionLeaseObject reminderDagActionLeaseObject = new 
DagActionStore.DagActionLeaseObject(
+          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType),
+          true, eventTimeMillis);
+      log.info("DagProc reminder triggered for dagAction event: {}", 
reminderDagActionLeaseObject);
 
       try {
         DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
-        dagManagement.addDagAction(dagAction);
+        dagManagement.addReminderDagAction(reminderDagActionLeaseObject);
       } catch (IOException e) {
-        log.error("Failed to add DagAction to DagManagement. Action: {}", 
dagAction);
+        log.error("Failed to add DagAction event to DagManagement. dagAction 
event: {}", reminderDagActionLeaseObject);
       }
     }
   }
@@ -117,20 +120,30 @@ public class DagActionReminderScheduler {
         dagAction.getFlowExecutionId(), dagAction.getJobName(), 
dagAction.getDagActionType());
   }
 
+  /**
+   * Creates a JobKey object for the reminder job where the name is the 
DagActionReminderKey from above and the group is
+   * the flowGroup
+   */
+  public static JobKey createJobKey(DagActionStore.DagAction dagAction) {
+    return new JobKey(createDagActionReminderKey(dagAction), 
dagAction.getFlowGroup());
+  }
+
   /**
    * Creates a jobDetail containing flow and job identifying information in 
the jobDataMap, uniquely identified
    *  by a key comprised of the dagAction's fields.
    */
-  public static JobDetail createReminderJobDetail(DagActionStore.DagAction 
dagAction) {
+  public static JobDetail 
createReminderJobDetail(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject) {
     JobDataMap dataMap = new JobDataMap();
-    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
-    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
-    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
-    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
-    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagAction.getDagActionType());
+    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, 
dagActionLeaseObject.getDagAction().getFlowName());
+    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, 
dagActionLeaseObject.getDagAction().getFlowGroup());
+    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, 
dagActionLeaseObject.getDagAction().getJobName());
+    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagActionLeaseObject.getDagAction().getFlowExecutionId());
+    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagActionLeaseObject.getDagAction().getDagActionType());
+    dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY, 
dagActionLeaseObject.getEventTimeMillis());
 
     return JobBuilder.newJob(ReminderJob.class)
-        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowGroup())
+        
.withIdentity(createDagActionReminderKey(dagActionLeaseObject.getDagAction()),
+            dagActionLeaseObject.getDagAction().getFlowGroup())
         .usingJobData(dataMap)
         .build();
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 2d22aa3a2..54f9a65f5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -47,11 +47,6 @@ public interface DagActionStore {
     final long flowExecutionId;
     final String jobName;
     final DagActionType dagActionType;
-    final boolean isReminder;
-
-    public DagAction(String flowGroup, String flowName, long flowExecutionId, 
String jobName, DagActionType dagActionType) {
-      this(flowGroup, flowName, flowExecutionId, jobName, dagActionType, 
false);
-    }
 
     public static DagAction forFlow(String flowGroup, String flowName, long 
flowExecutionId, DagActionType dagActionType) {
       return new DagAction(flowGroup, flowName, flowExecutionId, 
NO_JOB_NAME_DEFAULT, dagActionType);
@@ -83,6 +78,24 @@ public interface DagActionStore {
     }
   }
 
+  @Data
+  @RequiredArgsConstructor
+  class DagActionLeaseObject {
+    final DagAction dagAction;
+    final boolean isReminder;
+    final long eventTimeMillis;
+
+    /**
+     * Creates a lease object for a dagAction and eventTimeMillis representing 
an original event (isReminder is False)
+     */
+    public DagActionLeaseObject(DagAction dagAction, long eventTimeMillis) {
+      this.dagAction = dagAction;
+      this.isReminder = false;
+      this.eventTimeMillis = eventTimeMillis;
+    }
+  }
+
+
 
   /**
    * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index 75ee9b60e..eea645284 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -27,5 +27,13 @@ import java.io.IOException;
  */
 public interface DagManagement {
 
+  /**
+   * Used to add a dagAction event to DagManagement
+   */
   void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
+
+  /**
+   * Used to add reminder dagActions to the queue that already contain an 
eventTimestamp from the previous lease attempt
+   */
+  void addReminderDagAction(DagActionStore.DagActionLeaseObject 
reminderDagActionLeaseObject) throws IOException;
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 3f93837d3..6a8dcbdc3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -80,7 +80,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
   private final boolean isMultiActiveExecutionEnabled;
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
-  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private final BlockingQueue<DagActionStore.DagActionLeaseObject> 
dagActionLeaseObjectQueue = new LinkedBlockingQueue<>();
   private final DagManagementStateStore dagManagementStateStore;
 
   @Inject
@@ -110,10 +110,20 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   @Override
   public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
     // TODO: Used to track missing dag issue, remove later as needed
-    log.info("Add dagAction {}", dagAction);
+    log.info("Add original (non-reminder) dagAction {}", dagAction);
 
-    if (!this.dagActionQueue.offer(dagAction)) {
-      throw new RuntimeException("Could not add dag action " + dagAction + " 
to the queue");
+    if (!this.dagActionLeaseObjectQueue.offer(new 
DagActionStore.DagActionLeaseObject(dagAction, false, 
System.currentTimeMillis()))) {
+      throw new RuntimeException(String.format("Could not add dag action to 
the queue %s", dagAction));
+    }
+  }
+
+  @Override
+  public synchronized void 
addReminderDagAction(DagActionStore.DagActionLeaseObject 
reminderDagActionLeaseObject) {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add reminder dagAction {}", reminderDagActionLeaseObject);
+
+    if (!this.dagActionLeaseObjectQueue.offer(reminderDagActionLeaseObject)) {
+      throw new RuntimeException(String.format("Could not add reminder dag 
action to the queue %s", reminderDagActionLeaseObject));
     }
   }
 
@@ -127,20 +137,17 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       while (true) {
         DagActionStore.DagAction dagAction = null;
         try {
-          dagAction = this.dagActionQueue.take();
+          DagActionStore.DagActionLeaseObject dagActionLeaseObject = 
this.dagActionLeaseObjectQueue.take();
+          dagAction = dagActionLeaseObject.getDagAction();
           /* Create triggers for original (non-reminder) dag actions of type 
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
              Reminder triggers are used to inform hosts once the job start 
deadline and flow finish deadline are passed;
              then only is lease arbitration done to enforce the deadline 
violation and fail the job or flow if needed */
-          if (!dagAction.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
-            createJobStartDeadlineTrigger(dagAction);
-          } else if (!dagAction.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
-            createFlowFinishDeadlineTrigger(dagAction);
-          } else if (!dagAction.isReminder
-              || dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE
-              || dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
-            // todo - fix bug of a reminder event getting a lease even when 
the first attempt succeeded.
-            // for now, avoid processing reminder events if they are not for 
deadline dag actions
-            LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagAction);
+          if (!dagActionLeaseObject.isReminder() && dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+            createJobStartDeadlineTrigger(dagActionLeaseObject);
+          } else if (!dagActionLeaseObject.isReminder() && 
dagAction.dagActionType == 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+            createFlowFinishDeadlineTrigger(dagActionLeaseObject);
+          } else { // Handle original non-deadline dagActions as well as 
reminder events of all types
+            LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagActionLeaseObject);
             if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
               return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
             }
@@ -152,20 +159,22 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       }
   }
 
-  private void createJobStartDeadlineTrigger(DagActionStore.DagAction 
dagAction) throws SchedulerException, IOException {
+  private void 
createJobStartDeadlineTrigger(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject)
+      throws SchedulerException, IOException {
     long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
-        dagAction.getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+        
dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
     // todo - this timestamp is just an approximation, the real job submission 
has happened in past, and that is when a
     // ENFORCE_JOB_START_DEADLINE dag action was created; we are just 
processing that dag action here
     long jobSubmissionTime = System.currentTimeMillis();
     long reminderDuration = jobSubmissionTime + timeOutForJobStart - 
System.currentTimeMillis();
 
-    dagActionReminderScheduler.get().scheduleReminder(dagAction, 
reminderDuration);
+    dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject, 
reminderDuration);
   }
 
-  private void createFlowFinishDeadlineTrigger(DagActionStore.DagAction 
dagAction) throws SchedulerException, IOException {
+  private void 
createFlowFinishDeadlineTrigger(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject)
+      throws SchedulerException, IOException {
     long timeOutForJobFinish;
-    Dag.DagNode<JobExecutionPlan> dagNode = 
this.dagManagementStateStore.getDag(dagAction.getDagId()).get().getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode = 
this.dagManagementStateStore.getDag(dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0);
 
     try {
       timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
@@ -180,25 +189,25 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
     long reminderDuration = flowStartTime + timeOutForJobFinish - 
System.currentTimeMillis();
 
-    dagActionReminderScheduler.get().scheduleReminder(dagAction, 
reminderDuration);
+    dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject, 
reminderDuration);
   }
 
   /**
    * Returns a {@link LeaseAttemptStatus} associated with the
    * `dagAction` by calling
-   * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction, 
long, boolean, boolean)}.
-   * @param dagAction
+   * {@link 
MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagActionLeaseObject, 
boolean)}.
+   * @param dagActionLeaseObject
    * @return
    * @throws IOException
    * @throws SchedulerException
    */
-  private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction 
dagAction)
+  private LeaseAttemptStatus 
retrieveLeaseStatus(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
       throws IOException, SchedulerException {
-    // TODO: need to handle reminder events and flag them
+    // Uses reminder flag to determine whether to use current time as event 
time or previously saved event time
     LeaseAttemptStatus leaseAttemptStatus = 
this.dagActionProcessingLeaseArbiter
-        .tryAcquireLease(dagAction, System.currentTimeMillis(), 
dagAction.isReminder, false);
-        /* Schedule a reminder for the event unless the lease has been 
completed to safeguard against the case where even
-        we, when we might become the lease owner still fail to complete 
processing
+        .tryAcquireLease(dagActionLeaseObject, false);
+        /* Schedule a reminder for the event unless the lease has been 
completed to safeguard against the case where
+        even we, when we might become the lease owner still fail to complete 
processing
         */
     if (!(leaseAttemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus)) {
       scheduleReminderForEvent(leaseAttemptStatus);
@@ -228,11 +237,11 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   }
 
   /* Schedules a reminder for the flow action using {@link 
DagActionReminderScheduler} to reattempt the lease after the
-  current leaseholder's grant would have expired.
+  current leaseholder's grant would have expired. It saves the previous 
eventTimeMillis in the dagAction to use upon
+  reattempting the lease.
   */
   protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
       throws SchedulerException {
-    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagAction(),
-        leaseStatus.getMinimumLingerDurationMillis());
+    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagActionLeaseObject(),
 leaseStatus.getMinimumLingerDurationMillis());
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 06551ce14..967d89a89 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -105,17 +105,19 @@ public class FlowLaunchHandler {
    * event triggered by the scheduler by attempting a lease for the launch 
event and processing the result depending on
    * the status of the attempt.
    */
-  public void handleFlowLaunchTriggerEvent(Properties jobProps, 
DagActionStore.DagAction dagAction,
-      long eventTimeMillis, boolean isReminderEvent, boolean 
adoptConsensusFlowExecutionId) throws IOException {
+  public void handleFlowLaunchTriggerEvent(Properties jobProps,
+      DagActionStore.DagActionLeaseObject dagActionLeaseObject, boolean 
adoptConsensusFlowExecutionId)
+      throws IOException {
+    long previousEventTimeMillis = dagActionLeaseObject.getEventTimeMillis();
     LeaseAttemptStatus leaseAttempt = 
this.multiActiveLeaseArbiter.tryAcquireLease(
-        dagAction, eventTimeMillis, isReminderEvent, 
adoptConsensusFlowExecutionId);
+        dagActionLeaseObject, adoptConsensusFlowExecutionId);
     if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
         && persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt)) {
       log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", 
leaseAttempt.getConsensusDagAction(),
-          ((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt).getEventTimeMillis());
+          previousEventTimeMillis);
     } else { // when NOT successfully `persistDagAction`, set a reminder to 
re-attempt handling (unless leasing finished)
       
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
-          scheduleReminderForEvent(jobProps, leasedToAnother, 
eventTimeMillis));
+          scheduleReminderForEvent(jobProps, leasedToAnother, 
previousEventTimeMillis));
     }
   }
 
@@ -126,7 +128,8 @@ public class FlowLaunchHandler {
     } else if (leaseAttempt instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
       return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) 
leaseAttempt);
     } else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) 
{ // remind w/o delay to immediately re-attempt handling
-      return Optional.of(new 
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 
((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis(), 
0L));
+      return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(
+          ((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt).getConsensusDagActionLeaseObject(), 0L));
     } else {
       throw new RuntimeException("unexpected `LeaseAttemptStatus` derived 
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
     }
@@ -152,12 +155,13 @@ public class FlowLaunchHandler {
    * This method is used by {@link 
FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to 
check on
    * the other participant's progress to finish acting on a dag action after 
the time the lease should expire.
    * @param jobProps
-   * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
-   * @param triggerEventTimeMillis the event timestamp we were originally 
handling
+   * @param status used to extract event to be reminded for (stored in 
`consensusDagAction`) and the minimum time after
+   *               which reminder should occur
+   * @param triggerEventTimeMillis the event timestamp we were originally 
handling (only used for logging purposes)
    */
   private void scheduleReminderForEvent(Properties jobProps, 
LeaseAttemptStatus.LeasedToAnotherStatus status,
       long triggerEventTimeMillis) {
-    DagActionStore.DagAction dagAction = status.getConsensusDagAction();
+    DagActionStore.DagAction consensusDagAction = 
status.getConsensusDagAction();
     JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
         jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job 
group>>"));
     try {
@@ -166,9 +170,9 @@ public class FlowLaunchHandler {
         this.jobDoesNotExistInSchedulerCount.inc();
         return;
       }
-      Trigger reminderTrigger = createAndScheduleReminder(origJobKey, status, 
triggerEventTimeMillis);
+      Trigger reminderTrigger = createAndScheduleReminder(origJobKey, status, 
status.getEventTimeMillis());
       log.info("Flow Launch Handler - [{}, eventTimestamp: {}] - SCHEDULED 
REMINDER for event {} in {} millis",
-          dagAction, triggerEventTimeMillis, status.getEventTimeMillis(), 
reminderTrigger.getNextFireTime());
+          consensusDagAction, triggerEventTimeMillis, 
status.getEventTimeMillis(), reminderTrigger.getNextFireTime());
     } catch (SchedulerException e) {
       log.warn("Failed to add job reminder due to SchedulerException for job 
{} trigger event {}. Exception: {}",
           origJobKey, status.getEventTimeMillis(), e);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index 51f40c04a..ab8a9ee14 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -72,16 +72,14 @@ public class InstrumentedLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
dagAction, long eventTimeMillis,
-      boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws 
IOException {
-
+  public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject, 
boolean skipFlowExecutionIdReplacement) throws IOException {
     LeaseAttemptStatus leaseAttemptStatus =
-        decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagAction, 
eventTimeMillis, isReminderEvent,
-            skipFlowExecutionIdReplacement);
-    log.info("Multi-active scheduler lease attempt for dagAction: {} received 
type of leaseAttemptStatus: [{}, "
-            + "eventTimestamp: {}] ", dagAction, 
leaseAttemptStatus.getClass().getName(), eventTimeMillis);
+        decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagActionLeaseObject, 
skipFlowExecutionIdReplacement);
+    log.info("Multi-active scheduler lease attempt for leaseObject: {} 
received type of leaseAttemptStatus: [{}, "
+            + "eventTimestamp: {}] ", dagActionLeaseObject, 
leaseAttemptStatus.getClass().getName(),
+        dagActionLeaseObject.getEventTimeMillis());
     if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
-      if (isReminderEvent) {
+      if (dagActionLeaseObject.isReminder()) {
         this.leasesObtainedDueToReminderCount.mark();
       }
       this.leaseObtainedCount.inc();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index efa6ee881..3584f445f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -33,6 +33,14 @@ import lombok.Getter;
  * intended for `@Override`.
  */
 public abstract class LeaseAttemptStatus {
+  /**
+   * @return the {@link DagActionStore.DagActionLeaseObject}, containing the 
dagAction, eventTimeMillis of the event, and boolean
+   * indicating if it's a reminder event; {@see 
MultiActiveLeaseArbiter#tryAcquireLease}
+   */
+  public DagActionStore.DagActionLeaseObject 
getConsensusDagActionLeaseObject() {
+    return null;
+  }
+
   /**
    * @return the {@link DagActionStore.DagAction}, which may now have an 
updated flowExecutionId that MUST henceforth be
    * used; {@see MultiActiveLeaseArbiter#tryAcquireLease}
@@ -53,24 +61,29 @@ public abstract class LeaseAttemptStatus {
 
   /*
   The participant calling this method acquired the lease for the event in 
question.
-  The timestamp associated with the lease is stored in `eventTimeMillis` field 
and the time the caller obtained the
-  lease is stored within the`leaseAcquisitionTimestamp` field. Note that the 
`Dag action` returned by the lease
-   arbitration attempt will be unchanged for flows that do not adopt the 
consensus eventTimeMillis as the flow execution
-   id, so a separate field must be maintained to track the eventTimeMillis for 
lease completion. The
-   `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the 
current LeaseObtainedStatus via the
-   completeLease method from a caller without access to the {@link 
MultiActiveLeaseArbiter}.
+  The `consensusLeaseObject` returned by the lease arbitration attempt 
includes an updated value in the
+  `eventTimeMillis` field, which represents the consensus timestamp associated 
with the lease. For flows that do not
+  adopt the consensus `eventTimeMillis` as the flow execution ID, the 
`dagAction.flowExecutionId` will remain unchanged.
+  The consensus `eventTimeMillis` must be tracked for lease completion.
+  The time the caller obtained the lease is stored within 
the`leaseAcquisitionTimestamp` field.
+  The `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for 
the current LeaseObtainedStatus via the
+  completeLease method from a caller without access to the {@link 
MultiActiveLeaseArbiter}.
   */
   @Data
   // avoid - warning: Generating equals/hashCode implementation but without a 
call to superclass, even though this class does not extend java.lang.Object
   @EqualsAndHashCode(callSuper=false)
   public static class LeaseObtainedStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagAction consensusDagAction;
-    private final long eventTimeMillis;
+    private final DagActionStore.DagActionLeaseObject 
consensusDagActionLeaseObject;
     private final long leaseAcquisitionTimestamp;
     private final long minimumLingerDurationMillis;
     @Getter(AccessLevel.NONE)
     private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
 
+    @Override
+    public DagActionStore.DagAction getConsensusDagAction() {
+      return consensusDagActionLeaseObject.getDagAction();
+    }
+
     /**
      * Completes the lease referenced by this status object if it has not 
expired.
      * @return true if able to complete lease, false otherwise.
@@ -79,11 +92,15 @@ public abstract class LeaseAttemptStatus {
     public boolean completeLease() throws IOException {
       return multiActiveLeaseArbiter.recordLeaseSuccess(this);
     }
+
+    public long getEventTimeMillis() {
+      return consensusDagActionLeaseObject.getEventTimeMillis();
+    }
   }
 
   /*
   This dag action event already has a valid lease owned by another participant.
-  `eventTimeMillis' corresponds to the timestamp of the existing lease 
associated with this dag action, however the dag
+  See doc for {@link LeaseObtainedStatus} for details about 
consensusLeaseObject field. Note, that the dag
   action event it corresponds to may be a different and distinct occurrence of 
the same event.
   `minimumLingerDurationMillis` is the minimum amount of time to wait before 
this participant should return to check if
   the lease has completed or expired
@@ -92,8 +109,16 @@ public abstract class LeaseAttemptStatus {
   // avoid - warning: Generating equals/hashCode implementation but without a 
call to superclass, even though this class does not extend java.lang.Object
   @EqualsAndHashCode(callSuper=false)
   public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagAction consensusDagAction;
-    private final long eventTimeMillis;
+    private final DagActionStore.DagActionLeaseObject 
consensusDagActionLeaseObject;
     private final long minimumLingerDurationMillis;
+
+    @Override
+    public DagActionStore.DagAction getConsensusDagAction() {
+      return consensusDagActionLeaseObject.getDagAction();
+    }
+
+    public long getEventTimeMillis() {
+      return consensusDagActionLeaseObject.getEventTimeMillis();
+    }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index ba9c17c75..b49d2ae52 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -48,19 +48,17 @@ public interface MultiActiveLeaseArbiter {
    * acquisition timestamp of the entry for that dag action event (it could 
have pre-existed in the table or been newly
    * added by the previous write). Based on the transaction results, it will 
return {@link LeaseAttemptStatus} to
    * determine the next action.
-   * @param dagAction uniquely identifies the flow and the present action upon 
it
-   * @param eventTimeMillis is the time this dag action was triggered
-   * @param isReminderEvent true if the dag action event we're checking on is 
a reminder event
+   *
+   * @param dagActionLeaseObject                   uniquely identifies the 
flow, the present action upon it, the time the action
+   *                                      was triggered, and if the dag action 
event we're checking on is a reminder event
    * @param adoptConsensusFlowExecutionId if true then replaces the dagAction 
flowExecutionId returned in
    *                                      LeaseAttemptStatuses with the 
consensus eventTime, accessed via
    *                                      {@link 
LeaseAttemptStatus#getConsensusDagAction()}
-   *
    * @return {@link LeaseAttemptStatus}, containing, when 
`adoptConsensusFlowExecutionId`, a universally-agreed-upon
    * {@link DagActionStore.DagAction} with a possibly updated ("laundered") 
flow execution id that MUST be used thereafter
    * @throws IOException
    */
-  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, long 
eventTimeMillis, boolean isReminderEvent,
-      boolean adoptConsensusFlowExecutionId)
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagActionLeaseObject 
dagActionLeaseObject, boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 7543e20e3..4b910219b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -243,20 +243,20 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
dagAction, long eventTimeMillis,
-      boolean isReminderEvent, boolean adoptConsensusFlowExecutionId) throws 
IOException {
+  public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject, 
boolean adoptConsensusFlowExecutionId) throws IOException {
     log.info("Multi-active scheduler about to handle trigger event: [{}, is: 
{}, triggerEventTimestamp: {}]",
-        dagAction, isReminderEvent ? "reminder" : "original", eventTimeMillis);
+       dagActionLeaseObject.getDagAction(), dagActionLeaseObject.isReminder() 
? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis());
     // Query lease arbiter table about this dag action
-    Optional<GetEventInfoResult> getResult = getExistingEventInfo(dagAction, 
isReminderEvent, eventTimeMillis);
+    Optional<GetEventInfoResult> getResult = 
getExistingEventInfo(dagActionLeaseObject);
 
     try {
       if (!getResult.isPresent()) {
         log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 
1: no existing row for this dag action,"
-            + " then go ahead and insert", dagAction, isReminderEvent ? 
"reminder" : "original", eventTimeMillis);
-        int numRowsUpdated = attemptLeaseIfNewRow(dagAction);
-       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, dagAction, 
Optional.empty(),
-           isReminderEvent, adoptConsensusFlowExecutionId);
+            + " then go ahead and insert", dagActionLeaseObject.getDagAction(),
+            dagActionLeaseObject.isReminder() ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis());
+        int numRowsUpdated = 
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction());
+       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, 
dagActionLeaseObject.getDagAction(),
+           Optional.empty(), dagActionLeaseObject.isReminder(), 
adoptConsensusFlowExecutionId);
       }
 
       // Extract values from result set
@@ -270,30 +270,34 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
 
       // For reminder event, we can stop early if the reminder eventTimeMillis 
is older than the current event in the db
       // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
-      if (isReminderEvent) {
-        if (eventTimeMillis < dbEventTimestamp.getTime()) {
+      if (dagActionLeaseObject.isReminder()) {
+        if (dagActionLeaseObject.getEventTimeMillis() < 
dbEventTimestamp.getTime()) {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - A new event trigger "
-                  + "is being worked on, so this older reminder will be 
dropped.", dagAction,
-              isReminderEvent ? "reminder" : "original", eventTimeMillis, 
dbEventTimestamp);
+                  + "is being worked on, so this older reminder will be 
dropped.", dagActionLeaseObject.getDagAction(),
+             dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
+              dbEventTimestamp);
           return new LeaseAttemptStatus.NoLongerLeasingStatus();
         }
-        if (eventTimeMillis > dbEventTimestamp.getTime()) {
+        if (dagActionLeaseObject.getEventTimeMillis() > 
dbEventTimestamp.getTime()) {
           // TODO: emit metric here to capture this unexpected behavior
           log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Severe constraint "
                   + "violation encountered: a reminder event newer than db 
event was found when db laundering should "
-                  + "ensure monotonically increasing laundered event times.", 
dagAction,
-              isReminderEvent ? "reminder" : "original", eventTimeMillis, 
dbEventTimestamp.getTime());
+                  + "ensure monotonically increasing laundered event times.", 
dagActionLeaseObject.getDagAction(),
+             dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
+              dbEventTimestamp.getTime());
         }
-        if (eventTimeMillis == dbEventTimestamp.getTime()) {
+        if (dagActionLeaseObject.getEventTimeMillis() == 
dbEventTimestamp.getTime()) {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Reminder event time "
-                  + "is the same as db event.", dagAction, isReminderEvent ? 
"reminder" : "original",
-              eventTimeMillis, dbEventTimestamp);
+                  + "is the same as db event.", 
dagActionLeaseObject.getDagAction(),
+              dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
+              dbEventTimestamp);
         }
       }
 
       log.info("Multi-active arbiter replacing local trigger event timestamp 
[{}, is: {}, triggerEventTimestamp: {}] "
-          + "with database eventTimestamp {} (in epoch-millis)", dagAction, 
isReminderEvent ? "reminder" : "original",
-          eventTimeMillis, dbCurrentTimestamp.getTime());
+          + "with database eventTimestamp {} (in epoch-millis)", 
dagActionLeaseObject.getDagAction(),
+          dagActionLeaseObject.isReminder ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis(),
+          dbCurrentTimestamp.getTime());
 
       /* Note that we use `adoptConsensusFlowExecutionId` parameter's value to 
determine whether we should use the db
       laundered event timestamp as the flowExecutionId or maintain the 
original one
@@ -302,49 +306,55 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       // Lease is valid
       if (leaseValidityStatus == 1) {
         if (isWithinEpsilon) {
-          DagActionStore.DagAction updatedDagAction =
-              adoptConsensusFlowExecutionId ? 
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
+         DagActionStore.DagAction updatedDagAction =
+              adoptConsensusFlowExecutionId ? 
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbEventTimestamp.getTime())
 : dagActionLeaseObject.getDagAction();
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
-              updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
+              updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
-          return new 
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
dbEventTimestamp.getTime(),
+          return new LeaseAttemptStatus.LeasedToAnotherStatus(
+              new DagActionStore.DagActionLeaseObject(updatedDagAction, 
dbEventTimestamp.getTime()),
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
dbCurrentTimestamp.getTime());
         }
         DagActionStore.DagAction updatedDagAction =
-            adoptConsensusFlowExecutionId ? 
dagAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()) : dagAction;
+            adoptConsensusFlowExecutionId ? 
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime())
 : dagActionLeaseObject.getDagAction();
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
3: Distinct event, lease is valid",
-            updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
-        // Utilize db lease acquisition timestamp for wait time
-        return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
dbCurrentTimestamp.getTime(),
+            updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
+        // Utilize db lease acquisition timestamp for wait time and 
currentTimestamp as the new eventTimestamp
+        return new LeaseAttemptStatus.LeasedToAnotherStatus(
+            new DagActionStore.DagActionLeaseObject(updatedDagAction, 
dbCurrentTimestamp.getTime()),
             dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
dbCurrentTimestamp.getTime());
       } // Lease is invalid
       else if (leaseValidityStatus == 2) {
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
4: Lease is out of date (regardless of "
-            + "whether same or distinct event)", dagAction, isReminderEvent ? 
"reminder" : "original",
-            dbCurrentTimestamp.getTime());
-        if (isWithinEpsilon && !isReminderEvent) {
-          log.warn("Lease should not be out of date for the same trigger event 
since epsilon << linger for dagAction"
-                  + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp 
{}, linger {}", dagAction,
-              dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
+            + "whether same or distinct event)", 
dagActionLeaseObject.getDagAction(),
+            dagActionLeaseObject.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
+        if (isWithinEpsilon && !dagActionLeaseObject.isReminder) {
+          log.warn("Lease should not be out of date for the same trigger event 
since epsilon << linger for "
+                  + "leaseObject.getDagAction() {}, db eventTimestamp {}, db 
leaseAcquisitionTimestamp {}, linger {}",
+              dagActionLeaseObject.getDagAction(), dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, dbLinger);
         }
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
-        int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement, 
dagAction,
-            true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, dagAction, 
Optional.of(dbCurrentTimestamp),
-            isReminderEvent, adoptConsensusFlowExecutionId);
+        int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
+            dagActionLeaseObject.getDagAction(), true,true, dbEventTimestamp,
+            dbLeaseAcquisitionTimestamp);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, 
dagActionLeaseObject.getDagAction(),
+            Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder, 
adoptConsensusFlowExecutionId);
       } // No longer leasing this event
         if (isWithinEpsilon) {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 5: Same event, no longer leasing event"
-              + " in db", dagAction, isReminderEvent ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
+              + " in db", dagActionLeaseObject.getDagAction(),
+              dagActionLeaseObject.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
           return new LeaseAttemptStatus.NoLongerLeasingStatus();
         }
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
6: Distinct event, no longer leasing "
-            + "event in db", dagAction, isReminderEvent ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
+            + "event in db", dagActionLeaseObject.getDagAction(),
+            dagActionLeaseObject.isReminder ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
-        int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, dagAction,
-            true, false, dbEventTimestamp, null);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, dagAction, 
Optional.of(dbCurrentTimestamp),
-            isReminderEvent, adoptConsensusFlowExecutionId);
+        int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement,
+            dagActionLeaseObject.getDagAction(), true, false, dbEventTimestamp,
+            null);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, 
dagActionLeaseObject.getDagAction(),
+            Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder, 
adoptConsensusFlowExecutionId);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -353,18 +363,19 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   /**
    * Checks leaseArbiterTable for an existing entry for this dag action and 
event time
    */
-  protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagAction dagAction,
-      boolean isReminderEvent, long eventTimeMillis) throws IOException {
-    return dbStatementExecutor.withPreparedStatement(isReminderEvent ? 
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
+  protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
+      throws IOException {
+    return dbStatementExecutor.withPreparedStatement(
+        dagActionLeaseObject.isReminder ? thisTableGetInfoStatementForReminder 
: thisTableGetInfoStatement,
         getInfoStatement -> {
           int i = 0;
-          if (isReminderEvent) {
-            getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis), 
UTC_CAL.get());
+          if (dagActionLeaseObject.isReminder) {
+            getInfoStatement.setTimestamp(++i, new 
Timestamp(dagActionLeaseObject.getEventTimeMillis()), UTC_CAL.get());
           }
-          getInfoStatement.setString(++i, dagAction.getFlowGroup());
-          getInfoStatement.setString(++i, dagAction.getFlowName());
-          getInfoStatement.setString(++i, dagAction.getJobName());
-          getInfoStatement.setString(++i, 
dagAction.getDagActionType().toString());
+          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getFlowGroup());
+          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getFlowName());
+          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getJobName());
+          getInfoStatement.setString(++i, 
dagActionLeaseObject.getDagAction().getDagActionType().toString());
           ResultSet resultSet = getInfoStatement.executeQuery();
           try {
             if (!resultSet.next()) {
@@ -383,7 +394,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     try {
       // Extract values from result set
       Timestamp dbEventTimestamp = 
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get());
-      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get());
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("utc_lease_acquisition_timestamp",
+          UTC_CAL.get());
       boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
       int leaseValidityStatus = resultSet.getInt("lease_validity_status");
       int dbLinger = resultSet.getInt("linger");
@@ -490,13 +502,13 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
 
   /**
    * Parse result of attempted insert/update to obtain a lease for a
-   * {@link DagActionStore.DagAction} event by selecting values corresponding 
to that
+   * {@linkleaseObject.getDagAction()Store.DagAction} event by selecting 
values corresponding to that
    * event from the table to return the corresponding status based on 
successful insert/update or not.
    * @throws SQLException
    * @throws IOException
    */
   protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int 
numRowsUpdated,
-      DagActionStore.DagAction dagAction, Optional<Timestamp> 
dbCurrentTimestamp, boolean isReminderEvent,
+     DagActionStore.DagAction dagAction, Optional<Timestamp> 
dbCurrentTimestamp, boolean isReminderEvent,
       boolean adoptConsensusFlowExecutionId)
       throws SQLException, IOException {
     // Fetch values in row after attempted insert
@@ -505,8 +517,10 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
       return new LeaseAttemptStatus.NoLongerLeasingStatus();
     }
-    DagActionStore.DagAction updatedDagAction =
+   DagActionStore.DagAction updatedDagAction =
         adoptConsensusFlowExecutionId ? 
dagAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : dagAction;
+    DagActionStore.DagActionLeaseObject consensusDagActionLeaseObject =
+        new DagActionStore.DagActionLeaseObject(updatedDagAction, 
selectInfoResult.getEventTimeMillis());
     // If no db current timestamp is present, then use the full db linger 
value for duration
     long minimumLingerDurationMillis = dbCurrentTimestamp.isPresent() ?
         selectInfoResult.getLeaseAcquisitionTimeMillis().get() + 
selectInfoResult.getDbLinger()
@@ -514,14 +528,13 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     if (numRowsUpdated == 1) {
       log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] 
successfully!", updatedDagAction,
           isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis);
-      return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, 
selectInfoResult.eventTimeMillis,
+      return new 
LeaseAttemptStatus.LeaseObtainedStatus(consensusDagActionLeaseObject,
           selectInfoResult.getLeaseAcquisitionTimeMillis().get(), 
minimumLingerDurationMillis, this);
     }
     log.info("Another participant acquired lease in between for [{}, is: {}, 
eventTimestamp: {}] - num rows updated: {}",
         updatedDagAction, isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
-    return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
selectInfoResult.eventTimeMillis,
-        minimumLingerDurationMillis);
+    return new 
LeaseAttemptStatus.LeasedToAnotherStatus(consensusDagActionLeaseObject, 
minimumLingerDurationMillis);
   }
 
   /**
@@ -547,7 +560,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @throws SQLException
    */
   protected static void 
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement,
-      DagActionStore.DagAction dagAction) throws SQLException {
+     DagActionStore.DagAction dagAction) throws SQLException {
     int i = 0;
     statement.setString(++i, dagAction.getFlowGroup());
     statement.setString(++i, dagAction.getFlowName());
@@ -566,8 +579,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @param originalLeaseAcquisitionTimestamp value to compare to db one, null 
if not needed
    * @throws SQLException
    */
-  protected static void completeUpdatePreparedStatement(PreparedStatement 
statement,
-      DagActionStore.DagAction dagAction, boolean needEventTimeCheck, boolean 
needLeaseAcquisitionTimeCheck,
+  protected static void completeUpdatePreparedStatement(PreparedStatement 
statement, DagActionStore.DagAction dagAction,
+      boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
       Timestamp originalEventTimestamp, Timestamp 
originalLeaseAcquisitionTimestamp) throws SQLException {
     int i = 0;
     // Values to check if existing row matches previous read
@@ -615,7 +628,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   /**
-   * DTO for arbiter's current lease state for a DagActionEvent
+   * DTO for arbiter's current lease state for a 
leaseObject.getDagAction()Event
   */
   @Data
   static class GetEventInfoResult {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 25caaf224..49793cfcf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -220,12 +220,12 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
             flowGroup,
             flowName,
             FlowUtils.getOrCreateFlowExecutionId(flowSpec),
-            DagActionStore.DagActionType.LAUNCH
-        );
-
+            DagActionStore.DagActionType.LAUNCH);
+        DagActionStore.DagActionLeaseObject
+            leaseObject = new 
DagActionStore.DagActionLeaseObject(launchDagAction, isReminderEvent,
+            triggerTimestampMillis);
         // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as 
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
-        flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, 
launchDagAction, triggerTimestampMillis, isReminderEvent,
-            flowSpec.isScheduled());
+        flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, 
leaseObject, flowSpec.isScheduled());
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 9b634f9fe..6567fcf77 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -62,7 +62,8 @@ public class DagActionReminderSchedulerTest {
 
   @Test
   public void testCreateReminderJobDetail() {
-    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(launchDagAction);
+    long expectedEventTimeMillis = 55L;
+    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(new 
DagActionStore.DagActionLeaseObject(launchDagAction, false, 
expectedEventTimeMillis));
     Assert.assertEquals(jobDetail.getKey().toString(), flowGroup + "." + 
expectedKey);
     JobDataMap dataMap = jobDetail.getJobDataMap();
     Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY), 
flowGroup);
@@ -71,5 +72,6 @@ public class DagActionReminderSchedulerTest {
     Assert.assertEquals(dataMap.get(ConfigurationKeys.JOB_NAME_KEY), jobName);
     
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_TYPE_KEY),
         DagActionStore.DagActionType.LAUNCH);
+    
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_EVENT_TIME_KEY),
 expectedEventTimeMillis);
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 046366b24..fcea36d7f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -37,7 +37,6 @@ import 
org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -98,10 +97,10 @@ public class DagManagementTaskStreamImplTest {
     dagManagementTaskStream.addDagAction(launchAction);
     dagManagementTaskStream.addDagAction(launchAction);
     when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
-        .tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(), 
anyBoolean(), anyBoolean()))
+        .tryAcquireLease(any(DagActionStore.DagActionLeaseObject.class), 
anyBoolean()))
         .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
-            new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 3, 15),
-            new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 5, 0, 5, 
null));
+            new LeaseAttemptStatus.LeasedToAnotherStatus(new 
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 15),
+            new LeaseAttemptStatus.LeaseObtainedStatus(new 
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 0, 5, null));
     DagTask dagTask = dagManagementTaskStream.next();
     Assert.assertTrue(dagTask instanceof LaunchDagTask);
     DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 197ceda76..1afc77ba4 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -36,8 +36,10 @@ public class FlowLaunchHandlerTest {
   int schedulerBackOffMillis = 10;
   DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction("flowName", "flowGroup",
       flowExecutionId, "jobName", DagActionStore.DagActionType.LAUNCH);
+  DagActionStore.DagActionLeaseObject
+      leaseObject = new DagActionStore.DagActionLeaseObject(dagAction, false, 
eventToRevisit);
   LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
-      new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit, 
minimumLingerDurationMillis);
+      new LeaseAttemptStatus.LeasedToAnotherStatus(leaseObject, 
minimumLingerDurationMillis);
 
   /**
    * Remove first two fields from cron expression representing seconds and 
minutes to return truncated cron expression
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index bcb208fa7..b9abc37bf 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Optional;
+import java.util.UUID;
 
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -52,14 +53,20 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String flowName = "testFlowName";
   private static final String jobName = "testJobName";
   private static final long flowExecutionId = 12345677L;
+  private static final long eventTimeMillis = 1710451837L; // 
System.currentTimeMillis();
   // Dag actions with the same flow info but different flow action types are 
considered unique
   private static DagActionStore.DagAction launchDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
+  private static DagActionStore.DagActionLeaseObject
+      launchLeaseObject = new 
DagActionStore.DagActionLeaseObject(launchDagAction, false, eventTimeMillis);
   private static DagActionStore.DagAction resumeDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.RESUME);
+  private static DagActionStore.DagActionLeaseObject
+      resumeLeaseObject = new 
DagActionStore.DagActionLeaseObject(resumeDagAction, false, eventTimeMillis);
   private static DagActionStore.DagAction launchDagAction2 =
       new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
-  private static final long eventTimeMillis = System.currentTimeMillis();
+  private static DagActionStore.DagActionLeaseObject
+      launchLeaseObject2 = new 
DagActionStore.DagActionLeaseObject(launchDagAction2, false, eventTimeMillis);
   private static final Timestamp dummyTimestamp = new Timestamp(99999);
   private ITestMetastoreDatabase testDb;
   private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -103,21 +110,27 @@ public class MysqlMultiActiveLeaseArbiterTest {
   public void testAcquireLeaseSingleParticipant() throws Exception {
     // Tests CASE 1 of acquire lease for a flow action event not present in DB
     LeaseAttemptStatus firstLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
     Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
-    Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
-        firstObtainedStatus.getLeaseAcquisitionTimestamp());
+    long consensusEventTimeMillis = firstObtainedStatus.getEventTimeMillis();
+    Assert.assertTrue(consensusEventTimeMillis <= 
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+    // Make sure consensusEventTimeMillis is set, and it's not 0 or the 
original event time
+    log.info("consensus event time is {} eventtimeMillis is {}", 
consensusEventTimeMillis, eventTimeMillis);
+    Assert.assertTrue(consensusEventTimeMillis != eventTimeMillis && 
consensusEventTimeMillis != 0);
     Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
-        new DagActionStore.DagAction(flowGroup, flowName, 
firstObtainedStatus.getEventTimeMillis(),
-            jobName, DagActionStore.DagActionType.LAUNCH)));
+        new DagActionStore.DagAction(flowGroup, flowName, 
consensusEventTimeMillis, jobName,
+            DagActionStore.DagActionType.LAUNCH)));
+    Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
consensusEventTimeMillis);
+    
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder,
 false);
 
     // Verify that different DagAction types for the same flow can have leases 
at the same time
     DagActionStore.DagAction killDagAction = new
         DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.KILL);
     LeaseAttemptStatus killStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(
+            new DagActionStore.DagActionLeaseObject(killDagAction, false, 
eventTimeMillis), true);
     Assert.assertTrue(killStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus killObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) killStatus;
@@ -128,7 +141,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Very little time should have passed if this test directly follows the 
one above so this call will be considered
     // the same as the previous event
     LeaseAttemptStatus secondLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
     Assert.assertTrue(secondLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
@@ -140,7 +153,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Allow enough time to pass for this trigger to be considered distinct, 
but not enough time so the lease expires
     Thread.sleep(MORE_THAN_EPSILON);
     LeaseAttemptStatus thirdLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
     Assert.assertTrue(thirdLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -150,7 +163,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Tests CASE 4 of lease out of date
     Thread.sleep(LINGER);
     LeaseAttemptStatus fourthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
     Assert.assertTrue(fourthLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus fourthObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) fourthLaunchStatus;
@@ -163,14 +176,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
     Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
     LeaseAttemptStatus fifthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
     Assert.assertTrue(fifthLaunchStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
     Thread.sleep(MORE_THAN_EPSILON);
     LeaseAttemptStatus sixthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
     Assert.assertTrue(sixthLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus sixthObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) sixthLaunchStatus;
@@ -185,7 +198,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
      Note: this isolates and tests CASE 1 in which another participant could 
have acquired the lease between the time
      the read was done and subsequent write was carried out
   */
-  @Test (dependsOnMethods = "testAcquireLeaseSingleParticipant")
+  @Test
   public void testAcquireLeaseIfNewRow() throws IOException {
     // Inserting the first time should update 1 row
     
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 1);
@@ -195,12 +208,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
 
     /*
     Tests CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT to ensure 
insertion is not completed if another
-    participant updated the table between the prior reed and attempted 
insertion.
+    participant updated the table between the prior read and attempted 
insertion.
     Note: this isolates and tests CASE 4 in which a flow action event has an 
out of date lease, so a participant
     attempts a new one given the table the eventTimestamp and 
leaseAcquisitionTimestamp values are unchanged.
    */
   @Test (dependsOnMethods = "testAcquireLeaseIfNewRow")
-  public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() 
throws IOException {
+  public void testConditionallyAcquireLeaseIfMatchingAllColsStatement() throws 
IOException {
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
 
@@ -230,18 +243,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
   Note: This isolates and tests CASE 6 during which current participant saw a 
distinct flow action event had completed
   its prior lease, encouraging the current participant to acquire a lease for 
its event.
    */
-  @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
+  @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfMatchingAllColsStatement")
   public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
-      throws IOException, InterruptedException, SQLException {
-    // Mark the resume action lease from above as completed by fabricating a 
LeaseObtainedStatus
-    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
-    DagActionStore.DagAction updatedResumeDagAction = 
resumeDagAction.updateFlowExecutionId(
-        selectInfoResult.getEventTimeMillis());
-    boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
-        updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
-        selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
-    Assert.assertTrue(markedSuccess);
+      throws IOException, SQLException {
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
completeLeaseHelper(resumeLeaseObject);
     // Ensure no NPE results from calling this after a lease has been 
completed and acquisition timestamp val is NULL
     mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeDagAction,
         Optional.empty(), false, true);
@@ -262,14 +267,18 @@ public class MysqlMultiActiveLeaseArbiterTest {
   /*
   Tests calling `tryAcquireLease` for an older reminder event which should be 
immediately returned as `NoLongerLeasing`
    */
-  @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFinishedLeasingStatement")
+  @Test
   public void testOlderReminderEventAcquireLease() throws IOException {
+    DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
+    mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
     // Read database to obtain existing db eventTimeMillis and use it to 
construct an older event
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+        
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
     long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
+    DagActionStore.DagActionLeaseObject updatedLeaseObject =
+        new 
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true, 
olderEventTimestamp);
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
olderEventTimestamp, true, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
     Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
   }
 
@@ -278,30 +287,35 @@ public class MysqlMultiActiveLeaseArbiterTest {
   this case to occur because the reminderEvent should be triggered after the 
lease expires, but ensure it's handled
   correctly anyway.
    */
-  @Test (dependsOnMethods = "testOlderReminderEventAcquireLease")
+  @Test
   public void testReminderEventAcquireLeaseOnValidLease() throws IOException {
-    // Read database to obtain existing db eventTimeMillis and re-use it for 
the reminder event time
-    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
+    LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
+        (LeaseAttemptStatus.LeaseObtainedStatus) 
mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+    // Use the consensusLeaseObject containing the new eventTimeMillis for the 
reminder event time
+    DagActionStore.DagActionLeaseObject updatedLeaseObject =
+        new 
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true,
+            leaseObtainedStatus.getEventTimeMillis());
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
     Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus = 
(LeaseAttemptStatus.LeasedToAnotherStatus) attemptStatus;
-    Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(), 
selectInfoResult.getEventTimeMillis());
+    Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(), 
leaseObtainedStatus.getEventTimeMillis());
   }
 
   /*
   Tests calling `tryAcquireLease` for a reminder event whose lease has expired 
in the database and should successfully
   acquire a new lease
    */
-  @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnValidLease")
+  @Test
   public void testReminderEventAcquireLeaseOnInvalidLease() throws 
IOException, InterruptedException {
-    // Read database to obtain existing db eventTimeMillis and wait enough 
time for the lease to expire
-    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
+    mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
+    // Wait enough time for the lease to expire
     Thread.sleep(MORE_THAN_LINGER);
     LeaseAttemptStatus attemptStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, 
true);
     Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus obtainedStatus = 
(LeaseAttemptStatus.LeaseObtainedStatus) attemptStatus;
     Assert.assertTrue(obtainedStatus.getEventTimeMillis() > 
selectInfoResult.getEventTimeMillis());
@@ -314,23 +328,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
   Note: that we wait for enough time to pass that the event would have been 
considered distinct for a non-reminder case
   to ensure that the comparison made for reminder events is against the 
preserved event time not current time in db
    */
-   @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
+   @Test
    public void testReminderEventAcquireLeaseOnCompletedLease() throws 
IOException, InterruptedException {
-     // Mark the resume action lease from above as completed by fabricating a 
LeaseObtainedStatus
-     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
-     DagActionStore.DagAction updatedResumeDagAction = 
resumeDagAction.updateFlowExecutionId(
-         selectInfoResult.getEventTimeMillis());
-     boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
-         updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
-         selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, 
null));
-     Assert.assertTrue(markedSuccess);
+     // Create a new dag action and complete the lease
+     DagActionStore.DagActionLeaseObject newLaunchLeaseObject = 
getUniqueLaunchLeaseObject();
+     mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = 
completeLeaseHelper(newLaunchLeaseObject);
 
      // Sleep enough time for the event to have been considered distinct
      Thread.sleep(MORE_THAN_EPSILON);
      // Now have a reminder event check-in on the completed lease
+     DagActionStore.DagAction updatedDagAction = 
newLaunchLeaseObject.getDagAction().updateFlowExecutionId(
+         selectInfoResult.getEventTimeMillis());
+     DagActionStore.DagActionLeaseObject updatedLeaseObject =
+         new DagActionStore.DagActionLeaseObject(updatedDagAction, true, 
newLaunchLeaseObject.getEventTimeMillis());
      LeaseAttemptStatus attemptStatus =
-         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true, true);
+         mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, 
true);
      Assert.assertTrue(attemptStatus instanceof 
LeaseAttemptStatus.NoLongerLeasingStatus);
    }
 
@@ -344,7 +357,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
     // Obtain a lease for a new action and verify its flowExecutionId is not 
updated
     LeaseAttemptStatus firstLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, 
eventTimeMillis, false, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2, 
false);
     Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
@@ -352,18 +365,59 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() != 
Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId()));
     Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
+    
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
 false);
 
     // A second attempt to obtain a lease on the same action should return a 
LeasedToAnotherStatus which also contains
     // the original flowExecutionId and the same event time from the previous 
LeaseAttemptStatus
     LeaseAttemptStatus secondLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, 
eventTimeMillis, false, false);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2, 
false);
     Assert.assertTrue(secondLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
     Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
     
Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
+    
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
 false);
 
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus));
   }
+
+  public static String generateUniqueName() {
+      UUID uuid = UUID.randomUUID();
+      String name = uuid.toString().substring(0, 10);
+      return name;
+  }
+
+  /**
+   * Returns a unique launch type dagAction event by using 
'generateUniqueName' as flowName to create a unique event
+   */
+  public DagActionStore.DagAction getUniqueLaunchDagAction() {
+    return new DagActionStore.DagAction(flowGroup, generateUniqueName(), 
flowExecutionId, jobName,
+        DagActionStore.DagActionType.LAUNCH);
+  }
+
+  /**
+   * Returns a unique launch type leaseObject using 
#getUniqueLaunchDagAction() to create a unique flowName
+   */
+  public DagActionStore.DagActionLeaseObject getUniqueLaunchLeaseObject() {
+    return new DagActionStore.DagActionLeaseObject(getUniqueLaunchDagAction(), 
false, eventTimeMillis);
+  }
+
+  /**
+   * Marks the lease associated with the dagAction as completed by fabricating 
a LeaseObtainedStatus
+   * @return SelectInfoResult object containing the event information used to 
complete the lease
+   */
+  public MysqlMultiActiveLeaseArbiter.SelectInfoResult completeLeaseHelper(
+      DagActionStore.DagActionLeaseObject previouslyLeasedLeaseObj) throws 
IOException {
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+        
mysqlMultiActiveLeaseArbiter.getRowInfo(previouslyLeasedLeaseObj.getDagAction());
+    DagActionStore.DagAction updatedDagAction = 
previouslyLeasedLeaseObj.getDagAction().updateFlowExecutionId(
+        selectInfoResult.getEventTimeMillis());
+    DagActionStore.DagActionLeaseObject
+        updatedLeaseObject = new 
DagActionStore.DagActionLeaseObject(updatedDagAction, false, 
selectInfoResult.getEventTimeMillis());
+    boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
+        updatedLeaseObject, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+    Assert.assertTrue(markedSuccess);
+    return selectInfoResult;
+  }
 }

Reply via email to