This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c064604e4 [GOBBLIN-2073] Use previous eventTime for lease arbitration
of reminder dagActions (#3952)
c064604e4 is described below
commit c064604e4b804c9528a0b7365283b994acd071fc
Author: umustafi <[email protected]>
AuthorDate: Mon Jun 17 12:16:22 2024 -0700
[GOBBLIN-2073] Use previous eventTime for lease arbitration of reminder
dagActions (#3952)
* Use previous eventTime for lease arbitration of reminder dagActions
* Move isReminder and eventTimestamp into dagAction
* Fix existing tests and build error
* Decouple dependent MyMALA tests to prevent flakiness and allow
debuggability
* Creates a leaseObject to contain dagAction, isReminder, and
eventTimeMillis
---
.../orchestration/DagActionReminderScheduler.java | 51 ++++---
.../modules/orchestration/DagActionStore.java | 23 ++-
.../modules/orchestration/DagManagement.java | 8 ++
.../orchestration/DagManagementTaskStreamImpl.java | 71 +++++----
.../modules/orchestration/FlowLaunchHandler.java | 26 ++--
.../orchestration/InstrumentedLeaseArbiter.java | 14 +-
.../modules/orchestration/LeaseAttemptStatus.java | 47 ++++--
.../orchestration/MultiActiveLeaseArbiter.java | 10 +-
.../MysqlMultiActiveLeaseArbiter.java | 141 +++++++++---------
.../modules/orchestration/Orchestrator.java | 10 +-
.../DagActionReminderSchedulerTest.java | 4 +-
.../DagManagementTaskStreamImplTest.java | 7 +-
.../orchestration/FlowLaunchHandlerTest.java | 4 +-
.../MysqlMultiActiveLeaseArbiterTest.java | 160 ++++++++++++++-------
14 files changed, 357 insertions(+), 219 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index add618616..b1bb84cd5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -26,6 +26,7 @@ import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
@@ -60,20 +61,20 @@ public class DagActionReminderScheduler {
/**
* Uses a dagAction & reminder duration in milliseconds to create a
reminder job that will fire
* `reminderDurationMillis` after the current time
- * @param dagAction
+ * @param dagActionLeaseObject
* @param reminderDurationMillis
* @throws SchedulerException
*/
- public void scheduleReminder(DagActionStore.DagAction dagAction, long
reminderDurationMillis)
+ public void scheduleReminder(DagActionStore.DagActionLeaseObject
dagActionLeaseObject, long reminderDurationMillis)
throws SchedulerException {
- JobDetail jobDetail = createReminderJobDetail(dagAction);
- Trigger trigger = createReminderJobTrigger(dagAction,
reminderDurationMillis, System::currentTimeMillis);
+ JobDetail jobDetail = createReminderJobDetail(dagActionLeaseObject);
+ Trigger trigger =
createReminderJobTrigger(dagActionLeaseObject.getDagAction(),
reminderDurationMillis,
+ System::currentTimeMillis);
quartzScheduler.scheduleJob(jobDetail, trigger);
}
public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws
SchedulerException {
- JobDetail jobDetail = createReminderJobDetail(dagAction);
- quartzScheduler.deleteJob(jobDetail.getKey());
+ quartzScheduler.deleteJob(createJobKey(dagAction));
}
/**
@@ -84,6 +85,7 @@ public class DagActionReminderScheduler {
@Slf4j
public static class ReminderJob implements Job {
public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+ public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
@Override
public void execute(JobExecutionContext context) {
@@ -94,17 +96,18 @@ public class DagActionReminderScheduler {
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
long flowExecutionId =
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
+ long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
- log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
- + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName
+ ", dagActionType: " + dagActionType + ")");
-
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType, true);
+ DagActionStore.DagActionLeaseObject reminderDagActionLeaseObject = new
DagActionStore.DagActionLeaseObject(
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType),
+ true, eventTimeMillis);
+ log.info("DagProc reminder triggered for dagAction event: {}",
reminderDagActionLeaseObject);
try {
DagManagement dagManagement =
GobblinServiceManager.getClass(DagManagement.class);
- dagManagement.addDagAction(dagAction);
+ dagManagement.addReminderDagAction(reminderDagActionLeaseObject);
} catch (IOException e) {
- log.error("Failed to add DagAction to DagManagement. Action: {}",
dagAction);
+ log.error("Failed to add DagAction event to DagManagement. dagAction
event: {}", reminderDagActionLeaseObject);
}
}
}
@@ -117,20 +120,30 @@ public class DagActionReminderScheduler {
dagAction.getFlowExecutionId(), dagAction.getJobName(),
dagAction.getDagActionType());
}
+ /**
+ * Creates a JobKey object for the reminder job where the name is the
DagActionReminderKey from above and the group is
+ * the flowGroup
+ */
+ public static JobKey createJobKey(DagActionStore.DagAction dagAction) {
+ return new JobKey(createDagActionReminderKey(dagAction),
dagAction.getFlowGroup());
+ }
+
/**
* Creates a jobDetail containing flow and job identifying information in
the jobDataMap, uniquely identified
* by a key comprised of the dagAction's fields.
*/
- public static JobDetail createReminderJobDetail(DagActionStore.DagAction
dagAction) {
+ public static JobDetail
createReminderJobDetail(DagActionStore.DagActionLeaseObject
dagActionLeaseObject) {
JobDataMap dataMap = new JobDataMap();
- dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
- dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
- dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
- dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
- dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY,
dagAction.getDagActionType());
+ dataMap.put(ConfigurationKeys.FLOW_NAME_KEY,
dagActionLeaseObject.getDagAction().getFlowName());
+ dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY,
dagActionLeaseObject.getDagAction().getFlowGroup());
+ dataMap.put(ConfigurationKeys.JOB_NAME_KEY,
dagActionLeaseObject.getDagAction().getJobName());
+ dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagActionLeaseObject.getDagAction().getFlowExecutionId());
+ dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY,
dagActionLeaseObject.getDagAction().getDagActionType());
+ dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY,
dagActionLeaseObject.getEventTimeMillis());
return JobBuilder.newJob(ReminderJob.class)
- .withIdentity(createDagActionReminderKey(dagAction),
dagAction.getFlowGroup())
+
.withIdentity(createDagActionReminderKey(dagActionLeaseObject.getDagAction()),
+ dagActionLeaseObject.getDagAction().getFlowGroup())
.usingJobData(dataMap)
.build();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 2d22aa3a2..54f9a65f5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -47,11 +47,6 @@ public interface DagActionStore {
final long flowExecutionId;
final String jobName;
final DagActionType dagActionType;
- final boolean isReminder;
-
- public DagAction(String flowGroup, String flowName, long flowExecutionId,
String jobName, DagActionType dagActionType) {
- this(flowGroup, flowName, flowExecutionId, jobName, dagActionType,
false);
- }
public static DagAction forFlow(String flowGroup, String flowName, long
flowExecutionId, DagActionType dagActionType) {
return new DagAction(flowGroup, flowName, flowExecutionId,
NO_JOB_NAME_DEFAULT, dagActionType);
@@ -83,6 +78,24 @@ public interface DagActionStore {
}
}
+ @Data
+ @RequiredArgsConstructor
+ class DagActionLeaseObject {
+ final DagAction dagAction;
+ final boolean isReminder;
+ final long eventTimeMillis;
+
+ /**
+ * Creates a lease object for a dagAction and eventTimeMillis representing
an original event (isReminder is False)
+ */
+ public DagActionLeaseObject(DagAction dagAction, long eventTimeMillis) {
+ this.dagAction = dagAction;
+ this.isReminder = false;
+ this.eventTimeMillis = eventTimeMillis;
+ }
+ }
+
+
/**
* Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index 75ee9b60e..eea645284 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -27,5 +27,13 @@ import java.io.IOException;
*/
public interface DagManagement {
+ /**
+ * Used to add a dagAction event to DagManagement
+ */
void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
+
+ /**
+ * Used to add reminder dagActions to the queue that already contain an
eventTimestamp from the previous lease attempt
+ */
+ void addReminderDagAction(DagActionStore.DagActionLeaseObject
reminderDagActionLeaseObject) throws IOException;
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 3f93837d3..6a8dcbdc3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -80,7 +80,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
private final boolean isMultiActiveExecutionEnabled;
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
- private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private final BlockingQueue<DagActionStore.DagActionLeaseObject>
dagActionLeaseObjectQueue = new LinkedBlockingQueue<>();
private final DagManagementStateStore dagManagementStateStore;
@Inject
@@ -110,10 +110,20 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
@Override
public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
// TODO: Used to track missing dag issue, remove later as needed
- log.info("Add dagAction {}", dagAction);
+ log.info("Add original (non-reminder) dagAction {}", dagAction);
- if (!this.dagActionQueue.offer(dagAction)) {
- throw new RuntimeException("Could not add dag action " + dagAction + "
to the queue");
+ if (!this.dagActionLeaseObjectQueue.offer(new
DagActionStore.DagActionLeaseObject(dagAction, false,
System.currentTimeMillis()))) {
+ throw new RuntimeException(String.format("Could not add dag action to
the queue %s", dagAction));
+ }
+ }
+
+ @Override
+ public synchronized void
addReminderDagAction(DagActionStore.DagActionLeaseObject
reminderDagActionLeaseObject) {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add reminder dagAction {}", reminderDagActionLeaseObject);
+
+ if (!this.dagActionLeaseObjectQueue.offer(reminderDagActionLeaseObject)) {
+ throw new RuntimeException(String.format("Could not add reminder dag
action to the queue %s", reminderDagActionLeaseObject));
}
}
@@ -127,20 +137,17 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
while (true) {
DagActionStore.DagAction dagAction = null;
try {
- dagAction = this.dagActionQueue.take();
+ DagActionStore.DagActionLeaseObject dagActionLeaseObject =
this.dagActionLeaseObjectQueue.take();
+ dagAction = dagActionLeaseObject.getDagAction();
/* Create triggers for original (non-reminder) dag actions of type
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
Reminder triggers are used to inform hosts once the job start
deadline and flow finish deadline are passed;
then only is lease arbitration done to enforce the deadline
violation and fail the job or flow if needed */
- if (!dagAction.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
- createJobStartDeadlineTrigger(dagAction);
- } else if (!dagAction.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
- createFlowFinishDeadlineTrigger(dagAction);
- } else if (!dagAction.isReminder
- || dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE
- || dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
- // todo - fix bug of a reminder event getting a lease even when
the first attempt succeeded.
- // for now, avoid processing reminder events if they are not for
deadline dag actions
- LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagAction);
+ if (!dagActionLeaseObject.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+ createJobStartDeadlineTrigger(dagActionLeaseObject);
+ } else if (!dagActionLeaseObject.isReminder() &&
dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+ createFlowFinishDeadlineTrigger(dagActionLeaseObject);
+ } else { // Handle original non-deadline dagActions as well as
reminder events of all types
+ LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagActionLeaseObject);
if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
}
@@ -152,20 +159,22 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
}
}
- private void createJobStartDeadlineTrigger(DagActionStore.DagAction
dagAction) throws SchedulerException, IOException {
+ private void
createJobStartDeadlineTrigger(DagActionStore.DagActionLeaseObject
dagActionLeaseObject)
+ throws SchedulerException, IOException {
long timeOutForJobStart =
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
- dagAction.getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+
dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
// todo - this timestamp is just an approximation, the real job submission
has happened in past, and that is when a
// ENFORCE_JOB_START_DEADLINE dag action was created; we are just
processing that dag action here
long jobSubmissionTime = System.currentTimeMillis();
long reminderDuration = jobSubmissionTime + timeOutForJobStart -
System.currentTimeMillis();
- dagActionReminderScheduler.get().scheduleReminder(dagAction,
reminderDuration);
+ dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject,
reminderDuration);
}
- private void createFlowFinishDeadlineTrigger(DagActionStore.DagAction
dagAction) throws SchedulerException, IOException {
+ private void
createFlowFinishDeadlineTrigger(DagActionStore.DagActionLeaseObject
dagActionLeaseObject)
+ throws SchedulerException, IOException {
long timeOutForJobFinish;
- Dag.DagNode<JobExecutionPlan> dagNode =
this.dagManagementStateStore.getDag(dagAction.getDagId()).get().getNodes().get(0);
+ Dag.DagNode<JobExecutionPlan> dagNode =
this.dagManagementStateStore.getDag(dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0);
try {
timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
@@ -180,25 +189,25 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
long reminderDuration = flowStartTime + timeOutForJobFinish -
System.currentTimeMillis();
- dagActionReminderScheduler.get().scheduleReminder(dagAction,
reminderDuration);
+ dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject,
reminderDuration);
}
/**
* Returns a {@link LeaseAttemptStatus} associated with the
* `dagAction` by calling
- * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction,
long, boolean, boolean)}.
- * @param dagAction
+ * {@link
MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagActionLeaseObject,
boolean)}.
+ * @param dagActionLeaseObject
* @return
* @throws IOException
* @throws SchedulerException
*/
- private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction
dagAction)
+ private LeaseAttemptStatus
retrieveLeaseStatus(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
throws IOException, SchedulerException {
- // TODO: need to handle reminder events and flag them
+ // Uses reminder flag to determine whether to use current time as event
time or previously saved event time
LeaseAttemptStatus leaseAttemptStatus =
this.dagActionProcessingLeaseArbiter
- .tryAcquireLease(dagAction, System.currentTimeMillis(),
dagAction.isReminder, false);
- /* Schedule a reminder for the event unless the lease has been
completed to safeguard against the case where even
- we, when we might become the lease owner still fail to complete
processing
+ .tryAcquireLease(dagActionLeaseObject, false);
+ /* Schedule a reminder for the event unless the lease has been
completed to safeguard against the case where
+ even we, when we might become the lease owner still fail to complete
processing
*/
if (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus)) {
scheduleReminderForEvent(leaseAttemptStatus);
@@ -228,11 +237,11 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
}
/* Schedules a reminder for the flow action using {@link
DagActionReminderScheduler} to reattempt the lease after the
- current leaseholder's grant would have expired.
+ current leaseholder's grant would have expired. It saves the previous
eventTimeMillis in the dagAction to use upon
+ reattempting the lease.
*/
protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
throws SchedulerException {
-
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagAction(),
- leaseStatus.getMinimumLingerDurationMillis());
+
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagActionLeaseObject(),
leaseStatus.getMinimumLingerDurationMillis());
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 06551ce14..967d89a89 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -105,17 +105,19 @@ public class FlowLaunchHandler {
* event triggered by the scheduler by attempting a lease for the launch
event and processing the result depending on
* the status of the attempt.
*/
- public void handleFlowLaunchTriggerEvent(Properties jobProps,
DagActionStore.DagAction dagAction,
- long eventTimeMillis, boolean isReminderEvent, boolean
adoptConsensusFlowExecutionId) throws IOException {
+ public void handleFlowLaunchTriggerEvent(Properties jobProps,
+ DagActionStore.DagActionLeaseObject dagActionLeaseObject, boolean
adoptConsensusFlowExecutionId)
+ throws IOException {
+ long previousEventTimeMillis = dagActionLeaseObject.getEventTimeMillis();
LeaseAttemptStatus leaseAttempt =
this.multiActiveLeaseArbiter.tryAcquireLease(
- dagAction, eventTimeMillis, isReminderEvent,
adoptConsensusFlowExecutionId);
+ dagActionLeaseObject, adoptConsensusFlowExecutionId);
if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
&& persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseAttempt.getConsensusDagAction(),
- ((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt).getEventTimeMillis());
+ previousEventTimeMillis);
} else { // when NOT successfully `persistDagAction`, set a reminder to
re-attempt handling (unless leasing finished)
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
- scheduleReminderForEvent(jobProps, leasedToAnother,
eventTimeMillis));
+ scheduleReminderForEvent(jobProps, leasedToAnother,
previousEventTimeMillis));
}
}
@@ -126,7 +128,8 @@ public class FlowLaunchHandler {
} else if (leaseAttempt instanceof
LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus)
leaseAttempt);
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus)
{ // remind w/o delay to immediately re-attempt handling
- return Optional.of(new
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(),
((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis(),
0L));
+ return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(
+ ((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt).getConsensusDagActionLeaseObject(), 0L));
} else {
throw new RuntimeException("unexpected `LeaseAttemptStatus` derived
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
}
@@ -152,12 +155,13 @@ public class FlowLaunchHandler {
* This method is used by {@link
FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to
check on
* the other participant's progress to finish acting on a dag action after
the time the lease should expire.
* @param jobProps
- * @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
- * @param triggerEventTimeMillis the event timestamp we were originally
handling
+ * @param status used to extract event to be reminded for (stored in
`consensusDagAction`) and the minimum time after
+ * which reminder should occur
+ * @param triggerEventTimeMillis the event timestamp we were originally
handling (only used for logging purposes)
*/
private void scheduleReminderForEvent(Properties jobProps,
LeaseAttemptStatus.LeasedToAnotherStatus status,
long triggerEventTimeMillis) {
- DagActionStore.DagAction dagAction = status.getConsensusDagAction();
+ DagActionStore.DagAction consensusDagAction =
status.getConsensusDagAction();
JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
try {
@@ -166,9 +170,9 @@ public class FlowLaunchHandler {
this.jobDoesNotExistInSchedulerCount.inc();
return;
}
- Trigger reminderTrigger = createAndScheduleReminder(origJobKey, status,
triggerEventTimeMillis);
+ Trigger reminderTrigger = createAndScheduleReminder(origJobKey, status,
status.getEventTimeMillis());
log.info("Flow Launch Handler - [{}, eventTimestamp: {}] - SCHEDULED
REMINDER for event {} in {} millis",
- dagAction, triggerEventTimeMillis, status.getEventTimeMillis(),
reminderTrigger.getNextFireTime());
+ consensusDagAction, triggerEventTimeMillis,
status.getEventTimeMillis(), reminderTrigger.getNextFireTime());
} catch (SchedulerException e) {
log.warn("Failed to add job reminder due to SchedulerException for job
{} trigger event {}. Exception: {}",
origJobKey, status.getEventTimeMillis(), e);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index 51f40c04a..ab8a9ee14 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -72,16 +72,14 @@ public class InstrumentedLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
dagAction, long eventTimeMillis,
- boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws
IOException {
-
+ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject,
boolean skipFlowExecutionIdReplacement) throws IOException {
LeaseAttemptStatus leaseAttemptStatus =
- decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagAction,
eventTimeMillis, isReminderEvent,
- skipFlowExecutionIdReplacement);
- log.info("Multi-active scheduler lease attempt for dagAction: {} received
type of leaseAttemptStatus: [{}, "
- + "eventTimestamp: {}] ", dagAction,
leaseAttemptStatus.getClass().getName(), eventTimeMillis);
+ decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagActionLeaseObject,
skipFlowExecutionIdReplacement);
+ log.info("Multi-active scheduler lease attempt for leaseObject: {}
received type of leaseAttemptStatus: [{}, "
+ + "eventTimestamp: {}] ", dagActionLeaseObject,
leaseAttemptStatus.getClass().getName(),
+ dagActionLeaseObject.getEventTimeMillis());
if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
- if (isReminderEvent) {
+ if (dagActionLeaseObject.isReminder()) {
this.leasesObtainedDueToReminderCount.mark();
}
this.leaseObtainedCount.inc();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index efa6ee881..3584f445f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -33,6 +33,14 @@ import lombok.Getter;
* intended for `@Override`.
*/
public abstract class LeaseAttemptStatus {
+ /**
+ * @return the {@link DagActionStore.DagActionLeaseObject}, containing the
dagAction, eventTimeMillis of the event, and boolean
+ * indicating if it's a reminder event; {@see
MultiActiveLeaseArbiter#tryAcquireLease}
+ */
+ public DagActionStore.DagActionLeaseObject
getConsensusDagActionLeaseObject() {
+ return null;
+ }
+
/**
* @return the {@link DagActionStore.DagAction}, which may now have an
updated flowExecutionId that MUST henceforth be
* used; {@see MultiActiveLeaseArbiter#tryAcquireLease}
@@ -53,24 +61,29 @@ public abstract class LeaseAttemptStatus {
/*
The participant calling this method acquired the lease for the event in
question.
- The timestamp associated with the lease is stored in `eventTimeMillis` field
and the time the caller obtained the
- lease is stored within the`leaseAcquisitionTimestamp` field. Note that the
`Dag action` returned by the lease
- arbitration attempt will be unchanged for flows that do not adopt the
consensus eventTimeMillis as the flow execution
- id, so a separate field must be maintained to track the eventTimeMillis for
lease completion. The
- `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the
current LeaseObtainedStatus via the
- completeLease method from a caller without access to the {@link
MultiActiveLeaseArbiter}.
+ The `consensusLeaseObject` returned by the lease arbitration attempt
includes an updated value in the
+ `eventTimeMillis` field, which represents the consensus timestamp associated
with the lease. For flows that do not
+ adopt the consensus `eventTimeMillis` as the flow execution ID, the
`dagAction.flowExecutionId` will remain unchanged.
+ The consensus `eventTimeMillis` must be tracked for lease completion.
+ The time the caller obtained the lease is stored within
the`leaseAcquisitionTimestamp` field.
+ The `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for
the current LeaseObtainedStatus via the
+ completeLease method from a caller without access to the {@link
MultiActiveLeaseArbiter}.
*/
@Data
// avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
@EqualsAndHashCode(callSuper=false)
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagAction consensusDagAction;
- private final long eventTimeMillis;
+ private final DagActionStore.DagActionLeaseObject
consensusDagActionLeaseObject;
private final long leaseAcquisitionTimestamp;
private final long minimumLingerDurationMillis;
@Getter(AccessLevel.NONE)
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ @Override
+ public DagActionStore.DagAction getConsensusDagAction() {
+ return consensusDagActionLeaseObject.getDagAction();
+ }
+
/**
* Completes the lease referenced by this status object if it has not
expired.
* @return true if able to complete lease, false otherwise.
@@ -79,11 +92,15 @@ public abstract class LeaseAttemptStatus {
public boolean completeLease() throws IOException {
return multiActiveLeaseArbiter.recordLeaseSuccess(this);
}
+
+ public long getEventTimeMillis() {
+ return consensusDagActionLeaseObject.getEventTimeMillis();
+ }
}
/*
This dag action event already has a valid lease owned by another participant.
- `eventTimeMillis' corresponds to the timestamp of the existing lease
associated with this dag action, however the dag
+ See doc for {@link LeaseObtainedStatus} for details about
consensusLeaseObject field. Note, that the dag
action event it corresponds to may be a different and distinct occurrence of
the same event.
`minimumLingerDurationMillis` is the minimum amount of time to wait before
this participant should return to check if
the lease has completed or expired
@@ -92,8 +109,16 @@ public abstract class LeaseAttemptStatus {
// avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
@EqualsAndHashCode(callSuper=false)
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagAction consensusDagAction;
- private final long eventTimeMillis;
+ private final DagActionStore.DagActionLeaseObject
consensusDagActionLeaseObject;
private final long minimumLingerDurationMillis;
+
+ @Override
+ public DagActionStore.DagAction getConsensusDagAction() {
+ return consensusDagActionLeaseObject.getDagAction();
+ }
+
+ public long getEventTimeMillis() {
+ return consensusDagActionLeaseObject.getEventTimeMillis();
+ }
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index ba9c17c75..b49d2ae52 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -48,19 +48,17 @@ public interface MultiActiveLeaseArbiter {
* acquisition timestamp of the entry for that dag action event (it could
have pre-existed in the table or been newly
* added by the previous write). Based on the transaction results, it will
return {@link LeaseAttemptStatus} to
* determine the next action.
- * @param dagAction uniquely identifies the flow and the present action upon
it
- * @param eventTimeMillis is the time this dag action was triggered
- * @param isReminderEvent true if the dag action event we're checking on is
a reminder event
+ *
+ * @param dagActionLeaseObject uniquely identifies the
flow, the present action upon it, the time the action
+ * was triggered, and if the dag action
event we're checking on is a reminder event
* @param adoptConsensusFlowExecutionId if true then replaces the dagAction
flowExecutionId returned in
* LeaseAttemptStatuses with the
consensus eventTime, accessed via
* {@link
LeaseAttemptStatus#getConsensusDagAction()}
- *
* @return {@link LeaseAttemptStatus}, containing, when
`adoptConsensusFlowExecutionId`, a universally-agreed-upon
* {@link DagActionStore.DagAction} with a possibly updated ("laundered")
flow execution id that MUST be used thereafter
* @throws IOException
*/
- LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, long
eventTimeMillis, boolean isReminderEvent,
- boolean adoptConsensusFlowExecutionId)
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.DagActionLeaseObject
dagActionLeaseObject, boolean adoptConsensusFlowExecutionId)
throws IOException;
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 7543e20e3..4b910219b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -243,20 +243,20 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
dagAction, long eventTimeMillis,
- boolean isReminderEvent, boolean adoptConsensusFlowExecutionId) throws
IOException {
+ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject,
boolean adoptConsensusFlowExecutionId) throws IOException {
log.info("Multi-active scheduler about to handle trigger event: [{}, is:
{}, triggerEventTimestamp: {}]",
- dagAction, isReminderEvent ? "reminder" : "original", eventTimeMillis);
+ dagActionLeaseObject.getDagAction(), dagActionLeaseObject.isReminder()
? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis());
// Query lease arbiter table about this dag action
- Optional<GetEventInfoResult> getResult = getExistingEventInfo(dagAction,
isReminderEvent, eventTimeMillis);
+ Optional<GetEventInfoResult> getResult =
getExistingEventInfo(dagActionLeaseObject);
try {
if (!getResult.isPresent()) {
log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE
1: no existing row for this dag action,"
- + " then go ahead and insert", dagAction, isReminderEvent ?
"reminder" : "original", eventTimeMillis);
- int numRowsUpdated = attemptLeaseIfNewRow(dagAction);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, dagAction,
Optional.empty(),
- isReminderEvent, adoptConsensusFlowExecutionId);
+ + " then go ahead and insert", dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder() ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis());
+ int numRowsUpdated =
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction());
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated,
dagActionLeaseObject.getDagAction(),
+ Optional.empty(), dagActionLeaseObject.isReminder(),
adoptConsensusFlowExecutionId);
}
// Extract values from result set
@@ -270,30 +270,34 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// For reminder event, we can stop early if the reminder eventTimeMillis
is older than the current event in the db
// because db laundering tells us that the currently worked on db event
is newer and will have its own reminders
- if (isReminderEvent) {
- if (eventTimeMillis < dbEventTimestamp.getTime()) {
+ if (dagActionLeaseObject.isReminder()) {
+ if (dagActionLeaseObject.getEventTimeMillis() <
dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - A new event trigger "
- + "is being worked on, so this older reminder will be
dropped.", dagAction,
- isReminderEvent ? "reminder" : "original", eventTimeMillis,
dbEventTimestamp);
+ + "is being worked on, so this older reminder will be
dropped.", dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
+ dbEventTimestamp);
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
- if (eventTimeMillis > dbEventTimestamp.getTime()) {
+ if (dagActionLeaseObject.getEventTimeMillis() >
dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Severe constraint "
+ "violation encountered: a reminder event newer than db
event was found when db laundering should "
- + "ensure monotonically increasing laundered event times.",
dagAction,
- isReminderEvent ? "reminder" : "original", eventTimeMillis,
dbEventTimestamp.getTime());
+ + "ensure monotonically increasing laundered event times.",
dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
+ dbEventTimestamp.getTime());
}
- if (eventTimeMillis == dbEventTimestamp.getTime()) {
+ if (dagActionLeaseObject.getEventTimeMillis() ==
dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Reminder event time "
- + "is the same as db event.", dagAction, isReminderEvent ?
"reminder" : "original",
- eventTimeMillis, dbEventTimestamp);
+ + "is the same as db event.",
dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
+ dbEventTimestamp);
}
}
log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
- + "with database eventTimestamp {} (in epoch-millis)", dagAction,
isReminderEvent ? "reminder" : "original",
- eventTimeMillis, dbCurrentTimestamp.getTime());
+ + "with database eventTimestamp {} (in epoch-millis)",
dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
+ dbCurrentTimestamp.getTime());
/* Note that we use `adoptConsensusFlowExecutionId` parameter's value to
determine whether we should use the db
laundered event timestamp as the flowExecutionId or maintain the
original one
@@ -302,49 +306,55 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
- DagActionStore.DagAction updatedDagAction =
- adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
+ DagActionStore.DagAction updatedDagAction =
+ adoptConsensusFlowExecutionId ?
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbEventTimestamp.getTime())
: dagActionLeaseObject.getDagAction();
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbEventTimestamp.getTime(),
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(
+ new DagActionStore.DagActionLeaseObject(updatedDagAction,
dbEventTimestamp.getTime()),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedDagAction =
- adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()) : dagAction;
+ adoptConsensusFlowExecutionId ?
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime())
: dagActionLeaseObject.getDagAction();
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
- updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
- // Utilize db lease acquisition timestamp for wait time
- return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbCurrentTimestamp.getTime(),
+ updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
+ // Utilize db lease acquisition timestamp for wait time and
currentTimestamp as the new eventTimestamp
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(
+ new DagActionStore.DagActionLeaseObject(updatedDagAction,
dbCurrentTimestamp.getTime()),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
4: Lease is out of date (regardless of "
- + "whether same or distinct event)", dagAction, isReminderEvent ?
"reminder" : "original",
- dbCurrentTimestamp.getTime());
- if (isWithinEpsilon && !isReminderEvent) {
- log.warn("Lease should not be out of date for the same trigger event
since epsilon << linger for dagAction"
- + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp
{}, linger {}", dagAction,
- dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
+ + "whether same or distinct event)",
dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ if (isWithinEpsilon && !dagActionLeaseObject.isReminder) {
+ log.warn("Lease should not be out of date for the same trigger event
since epsilon << linger for "
+ + "leaseObject.getDagAction() {}, db eventTimestamp {}, db
leaseAcquisitionTimestamp {}, linger {}",
+ dagActionLeaseObject.getDagAction(), dbEventTimestamp,
dbLeaseAcquisitionTimestamp, dbLinger);
}
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
- int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
dagAction,
- true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, dagAction,
Optional.of(dbCurrentTimestamp),
- isReminderEvent, adoptConsensusFlowExecutionId);
+ int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
+ dagActionLeaseObject.getDagAction(), true,true, dbEventTimestamp,
+ dbLeaseAcquisitionTimestamp);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated,
dagActionLeaseObject.getDagAction(),
+ Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder,
adoptConsensusFlowExecutionId);
} // No longer leasing this event
if (isWithinEpsilon) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 5: Same event, no longer leasing event"
- + " in db", dagAction, isReminderEvent ? "reminder" :
"original", dbCurrentTimestamp.getTime());
+ + " in db", dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
6: Distinct event, no longer leasing "
- + "event in db", dagAction, isReminderEvent ? "reminder" :
"original", dbCurrentTimestamp.getTime());
+ + "event in db", dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
- int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, dagAction,
- true, false, dbEventTimestamp, null);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, dagAction,
Optional.of(dbCurrentTimestamp),
- isReminderEvent, adoptConsensusFlowExecutionId);
+ int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement,
+ dagActionLeaseObject.getDagAction(), true, false, dbEventTimestamp,
+ null);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated,
dagActionLeaseObject.getDagAction(),
+ Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder,
adoptConsensusFlowExecutionId);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -353,18 +363,19 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
/**
* Checks leaseArbiterTable for an existing entry for this dag action and
event time
*/
- protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagAction dagAction,
- boolean isReminderEvent, long eventTimeMillis) throws IOException {
- return dbStatementExecutor.withPreparedStatement(isReminderEvent ?
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
+ protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
+ throws IOException {
+ return dbStatementExecutor.withPreparedStatement(
+ dagActionLeaseObject.isReminder ? thisTableGetInfoStatementForReminder
: thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
- if (isReminderEvent) {
- getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis),
UTC_CAL.get());
+ if (dagActionLeaseObject.isReminder) {
+ getInfoStatement.setTimestamp(++i, new
Timestamp(dagActionLeaseObject.getEventTimeMillis()), UTC_CAL.get());
}
- getInfoStatement.setString(++i, dagAction.getFlowGroup());
- getInfoStatement.setString(++i, dagAction.getFlowName());
- getInfoStatement.setString(++i, dagAction.getJobName());
- getInfoStatement.setString(++i,
dagAction.getDagActionType().toString());
+ getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getFlowGroup());
+ getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getFlowName());
+ getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getJobName());
+ getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getDagActionType().toString());
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
@@ -383,7 +394,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
try {
// Extract values from result set
Timestamp dbEventTimestamp =
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get());
- Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get());
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("utc_lease_acquisition_timestamp",
+ UTC_CAL.get());
boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
int leaseValidityStatus = resultSet.getInt("lease_validity_status");
int dbLinger = resultSet.getInt("linger");
@@ -490,13 +502,13 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
/**
* Parse result of attempted insert/update to obtain a lease for a
- * {@link DagActionStore.DagAction} event by selecting values corresponding
to that
+ * {@linkleaseObject.getDagAction()Store.DagAction} event by selecting
values corresponding to that
* event from the table to return the corresponding status based on
successful insert/update or not.
* @throws SQLException
* @throws IOException
*/
protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int
numRowsUpdated,
- DagActionStore.DagAction dagAction, Optional<Timestamp>
dbCurrentTimestamp, boolean isReminderEvent,
+ DagActionStore.DagAction dagAction, Optional<Timestamp>
dbCurrentTimestamp, boolean isReminderEvent,
boolean adoptConsensusFlowExecutionId)
throws SQLException, IOException {
// Fetch values in row after attempted insert
@@ -505,8 +517,10 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
- DagActionStore.DagAction updatedDagAction =
+ DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : dagAction;
+ DagActionStore.DagActionLeaseObject consensusDagActionLeaseObject =
+ new DagActionStore.DagActionLeaseObject(updatedDagAction,
selectInfoResult.getEventTimeMillis());
// If no db current timestamp is present, then use the full db linger
value for duration
long minimumLingerDurationMillis = dbCurrentTimestamp.isPresent() ?
selectInfoResult.getLeaseAcquisitionTimeMillis().get() +
selectInfoResult.getDbLinger()
@@ -514,14 +528,13 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedDagAction,
isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
- return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction,
selectInfoResult.eventTimeMillis,
+ return new
LeaseAttemptStatus.LeaseObtainedStatus(consensusDagActionLeaseObject,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(),
minimumLingerDurationMillis, this);
}
log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: {}",
updatedDagAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
- return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
selectInfoResult.eventTimeMillis,
- minimumLingerDurationMillis);
+ return new
LeaseAttemptStatus.LeasedToAnotherStatus(consensusDagActionLeaseObject,
minimumLingerDurationMillis);
}
/**
@@ -547,7 +560,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @throws SQLException
*/
protected static void
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement,
- DagActionStore.DagAction dagAction) throws SQLException {
+ DagActionStore.DagAction dagAction) throws SQLException {
int i = 0;
statement.setString(++i, dagAction.getFlowGroup());
statement.setString(++i, dagAction.getFlowName());
@@ -566,8 +579,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @param originalLeaseAcquisitionTimestamp value to compare to db one, null
if not needed
* @throws SQLException
*/
- protected static void completeUpdatePreparedStatement(PreparedStatement
statement,
- DagActionStore.DagAction dagAction, boolean needEventTimeCheck, boolean
needLeaseAcquisitionTimeCheck,
+ protected static void completeUpdatePreparedStatement(PreparedStatement
statement, DagActionStore.DagAction dagAction,
+ boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
Timestamp originalEventTimestamp, Timestamp
originalLeaseAcquisitionTimestamp) throws SQLException {
int i = 0;
// Values to check if existing row matches previous read
@@ -615,7 +628,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
/**
- * DTO for arbiter's current lease state for a DagActionEvent
+ * DTO for arbiter's current lease state for a
leaseObject.getDagAction()Event
*/
@Data
static class GetEventInfoResult {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 25caaf224..49793cfcf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -220,12 +220,12 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
flowGroup,
flowName,
FlowUtils.getOrCreateFlowExecutionId(flowSpec),
- DagActionStore.DagActionType.LAUNCH
- );
-
+ DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.DagActionLeaseObject
+ leaseObject = new
DagActionStore.DagActionLeaseObject(launchDagAction, isReminderEvent,
+ triggerTimestampMillis);
// `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
- flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps,
launchDagAction, triggerTimestampMillis, isReminderEvent,
- flowSpec.isScheduled());
+ flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps,
leaseObject, flowSpec.isScheduled());
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 9b634f9fe..6567fcf77 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -62,7 +62,8 @@ public class DagActionReminderSchedulerTest {
@Test
public void testCreateReminderJobDetail() {
- JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(launchDagAction);
+ long expectedEventTimeMillis = 55L;
+ JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(new
DagActionStore.DagActionLeaseObject(launchDagAction, false,
expectedEventTimeMillis));
Assert.assertEquals(jobDetail.getKey().toString(), flowGroup + "." +
expectedKey);
JobDataMap dataMap = jobDetail.getJobDataMap();
Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
@@ -71,5 +72,6 @@ public class DagActionReminderSchedulerTest {
Assert.assertEquals(dataMap.get(ConfigurationKeys.JOB_NAME_KEY), jobName);
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_TYPE_KEY),
DagActionStore.DagActionType.LAUNCH);
+
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_EVENT_TIME_KEY),
expectedEventTimeMillis);
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 046366b24..fcea36d7f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -37,7 +37,6 @@ import
org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -98,10 +97,10 @@ public class DagManagementTaskStreamImplTest {
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
- .tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(),
anyBoolean(), anyBoolean()))
+ .tryAcquireLease(any(DagActionStore.DagActionLeaseObject.class),
anyBoolean()))
.thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
- new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 3, 15),
- new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 5, 0, 5,
null));
+ new LeaseAttemptStatus.LeasedToAnotherStatus(new
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 15),
+ new LeaseAttemptStatus.LeaseObtainedStatus(new
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 0, 5, null));
DagTask dagTask = dagManagementTaskStream.next();
Assert.assertTrue(dagTask instanceof LaunchDagTask);
DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 197ceda76..1afc77ba4 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -36,8 +36,10 @@ public class FlowLaunchHandlerTest {
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction("flowName", "flowGroup",
flowExecutionId, "jobName", DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.DagActionLeaseObject
+ leaseObject = new DagActionStore.DagActionLeaseObject(dagAction, false,
eventToRevisit);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
- new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit,
minimumLingerDurationMillis);
+ new LeaseAttemptStatus.LeasedToAnotherStatus(leaseObject,
minimumLingerDurationMillis);
/**
* Remove first two fields from cron expression representing seconds and
minutes to return truncated cron expression
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index bcb208fa7..b9abc37bf 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Optional;
+import java.util.UUID;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -52,14 +53,20 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final long flowExecutionId = 12345677L;
+ private static final long eventTimeMillis = 1710451837L; //
System.currentTimeMillis();
// Dag actions with the same flow info but different flow action types are
considered unique
private static DagActionStore.DagAction launchDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
+ private static DagActionStore.DagActionLeaseObject
+ launchLeaseObject = new
DagActionStore.DagActionLeaseObject(launchDagAction, false, eventTimeMillis);
private static DagActionStore.DagAction resumeDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.RESUME);
+ private static DagActionStore.DagActionLeaseObject
+ resumeLeaseObject = new
DagActionStore.DagActionLeaseObject(resumeDagAction, false, eventTimeMillis);
private static DagActionStore.DagAction launchDagAction2 =
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
- private static final long eventTimeMillis = System.currentTimeMillis();
+ private static DagActionStore.DagActionLeaseObject
+ launchLeaseObject2 = new
DagActionStore.DagActionLeaseObject(launchDagAction2, false, eventTimeMillis);
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private ITestMetastoreDatabase testDb;
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -103,21 +110,27 @@ public class MysqlMultiActiveLeaseArbiterTest {
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
- Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
- firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ long consensusEventTimeMillis = firstObtainedStatus.getEventTimeMillis();
+ Assert.assertTrue(consensusEventTimeMillis <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ // Make sure consensusEventTimeMillis is set, and it's not 0 or the
original event time
+ log.info("consensus event time is {} eventtimeMillis is {}",
consensusEventTimeMillis, eventTimeMillis);
+ Assert.assertTrue(consensusEventTimeMillis != eventTimeMillis &&
consensusEventTimeMillis != 0);
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
- new DagActionStore.DagAction(flowGroup, flowName,
firstObtainedStatus.getEventTimeMillis(),
- jobName, DagActionStore.DagActionType.LAUNCH)));
+ new DagActionStore.DagAction(flowGroup, flowName,
consensusEventTimeMillis, jobName,
+ DagActionStore.DagActionType.LAUNCH)));
+ Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
consensusEventTimeMillis);
+
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder,
false);
// Verify that different DagAction types for the same flow can have leases
at the same time
DagActionStore.DagAction killDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.KILL);
LeaseAttemptStatus killStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(
+ new DagActionStore.DagActionLeaseObject(killDagAction, false,
eventTimeMillis), true);
Assert.assertTrue(killStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus killObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) killStatus;
@@ -128,7 +141,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Very little time should have passed if this test directly follows the
one above so this call will be considered
// the same as the previous event
LeaseAttemptStatus secondLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
Assert.assertTrue(secondLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
@@ -140,7 +153,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Allow enough time to pass for this trigger to be considered distinct,
but not enough time so the lease expires
Thread.sleep(MORE_THAN_EPSILON);
LeaseAttemptStatus thirdLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
Assert.assertTrue(thirdLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -150,7 +163,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Tests CASE 4 of lease out of date
Thread.sleep(LINGER);
LeaseAttemptStatus fourthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
Assert.assertTrue(fourthLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus fourthObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) fourthLaunchStatus;
@@ -163,14 +176,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
LeaseAttemptStatus fifthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
Assert.assertTrue(fifthLaunchStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be
acquired
Thread.sleep(MORE_THAN_EPSILON);
LeaseAttemptStatus sixthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
Assert.assertTrue(sixthLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus sixthObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) sixthLaunchStatus;
@@ -185,7 +198,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Note: this isolates and tests CASE 1 in which another participant could
have acquired the lease between the time
the read was done and subsequent write was carried out
*/
- @Test (dependsOnMethods = "testAcquireLeaseSingleParticipant")
+ @Test
public void testAcquireLeaseIfNewRow() throws IOException {
// Inserting the first time should update 1 row
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
1);
@@ -195,12 +208,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
/*
Tests CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT to ensure
insertion is not completed if another
- participant updated the table between the prior reed and attempted
insertion.
+ participant updated the table between the prior read and attempted
insertion.
Note: this isolates and tests CASE 4 in which a flow action event has an
out of date lease, so a participant
attempts a new one given the table the eventTimestamp and
leaseAcquisitionTimestamp values are unchanged.
*/
@Test (dependsOnMethods = "testAcquireLeaseIfNewRow")
- public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement()
throws IOException {
+ public void testConditionallyAcquireLeaseIfMatchingAllColsStatement() throws
IOException {
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
@@ -230,18 +243,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
Note: This isolates and tests CASE 6 during which current participant saw a
distinct flow action event had completed
its prior lease, encouraging the current participant to acquire a lease for
its event.
*/
- @Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
+ @Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfMatchingAllColsStatement")
public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
- throws IOException, InterruptedException, SQLException {
- // Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
- MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
- mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
- DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
- selectInfoResult.getEventTimeMillis());
- boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
- updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
- selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
- Assert.assertTrue(markedSuccess);
+ throws IOException, SQLException {
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
completeLeaseHelper(resumeLeaseObject);
// Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction,
Optional.empty(), false, true);
@@ -262,14 +267,18 @@ public class MysqlMultiActiveLeaseArbiterTest {
/*
Tests calling `tryAcquireLease` for an older reminder event which should be
immediately returned as `NoLongerLeasing`
*/
- @Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFinishedLeasingStatement")
+ @Test
public void testOlderReminderEventAcquireLease() throws IOException {
+ DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
// Read database to obtain existing db eventTimeMillis and use it to
construct an older event
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
- mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
+ DagActionStore.DagActionLeaseObject updatedLeaseObject =
+ new
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true,
olderEventTimestamp);
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
olderEventTimestamp, true, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
}
@@ -278,30 +287,35 @@ public class MysqlMultiActiveLeaseArbiterTest {
this case to occur because the reminderEvent should be triggered after the
lease expires, but ensure it's handled
correctly anyway.
*/
- @Test (dependsOnMethods = "testOlderReminderEventAcquireLease")
+ @Test
public void testReminderEventAcquireLeaseOnValidLease() throws IOException {
- // Read database to obtain existing db eventTimeMillis and re-use it for
the reminder event time
- MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
- mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
+ LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
+ (LeaseAttemptStatus.LeaseObtainedStatus)
mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+ // Use the consensusLeaseObject containing the new eventTimeMillis for the
reminder event time
+ DagActionStore.DagActionLeaseObject updatedLeaseObject =
+ new
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true,
+ leaseObtainedStatus.getEventTimeMillis());
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) attemptStatus;
- Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(),
selectInfoResult.getEventTimeMillis());
+ Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(),
leaseObtainedStatus.getEventTimeMillis());
}
/*
Tests calling `tryAcquireLease` for a reminder event whose lease has expired
in the database and should successfully
acquire a new lease
*/
- @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnValidLease")
+ @Test
public void testReminderEventAcquireLeaseOnInvalidLease() throws
IOException, InterruptedException {
- // Read database to obtain existing db eventTimeMillis and wait enough
time for the lease to expire
- MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
- mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
+ // Wait enough time for the lease to expire
Thread.sleep(MORE_THAN_LINGER);
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject,
true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus obtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) attemptStatus;
Assert.assertTrue(obtainedStatus.getEventTimeMillis() >
selectInfoResult.getEventTimeMillis());
@@ -314,23 +328,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
Note: that we wait for enough time to pass that the event would have been
considered distinct for a non-reminder case
to ensure that the comparison made for reminder events is against the
preserved event time not current time in db
*/
- @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
+ @Test
public void testReminderEventAcquireLeaseOnCompletedLease() throws
IOException, InterruptedException {
- // Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
- MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
- mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
- DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
- selectInfoResult.getEventTimeMillis());
- boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
- updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
- selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER,
null));
- Assert.assertTrue(markedSuccess);
+ // Create a new dag action and complete the lease
+ DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
completeLeaseHelper(newLaunchLeaseObject);
// Sleep enough time for the event to have been considered distinct
Thread.sleep(MORE_THAN_EPSILON);
// Now have a reminder event check-in on the completed lease
+ DagActionStore.DagAction updatedDagAction =
newLaunchLeaseObject.getDagAction().updateFlowExecutionId(
+ selectInfoResult.getEventTimeMillis());
+ DagActionStore.DagActionLeaseObject updatedLeaseObject =
+ new DagActionStore.DagActionLeaseObject(updatedDagAction, true,
newLaunchLeaseObject.getEventTimeMillis());
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject,
true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
}
@@ -344,7 +357,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
// Obtain a lease for a new action and verify its flowExecutionId is not
updated
LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2,
eventTimeMillis, false, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2,
false);
Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
@@ -352,18 +365,59 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() !=
Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId()));
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
+
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
false);
// A second attempt to obtain a lease on the same action should return a
LeasedToAnotherStatus which also contains
// the original flowExecutionId and the same event time from the previous
LeaseAttemptStatus
LeaseAttemptStatus secondLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2,
eventTimeMillis, false, false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2,
false);
Assert.assertTrue(secondLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
+
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
false);
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus));
}
+
+ public static String generateUniqueName() {
+ UUID uuid = UUID.randomUUID();
+ String name = uuid.toString().substring(0, 10);
+ return name;
+ }
+
+ /**
+ * Returns a unique launch type dagAction event by using
'generateUniqueName' as flowName to create a unique event
+ */
+ public DagActionStore.DagAction getUniqueLaunchDagAction() {
+ return new DagActionStore.DagAction(flowGroup, generateUniqueName(),
flowExecutionId, jobName,
+ DagActionStore.DagActionType.LAUNCH);
+ }
+
+ /**
+ * Returns a unique launch type leaseObject using
#getUniqueLaunchDagAction() to create a unique flowName
+ */
+ public DagActionStore.DagActionLeaseObject getUniqueLaunchLeaseObject() {
+ return new DagActionStore.DagActionLeaseObject(getUniqueLaunchDagAction(),
false, eventTimeMillis);
+ }
+
+ /**
+ * Marks the lease associated with the dagAction as completed by fabricating
a LeaseObtainedStatus
+ * @return SelectInfoResult object containing the event information used to
complete the lease
+ */
+ public MysqlMultiActiveLeaseArbiter.SelectInfoResult completeLeaseHelper(
+ DagActionStore.DagActionLeaseObject previouslyLeasedLeaseObj) throws
IOException {
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+
mysqlMultiActiveLeaseArbiter.getRowInfo(previouslyLeasedLeaseObj.getDagAction());
+ DagActionStore.DagAction updatedDagAction =
previouslyLeasedLeaseObj.getDagAction().updateFlowExecutionId(
+ selectInfoResult.getEventTimeMillis());
+ DagActionStore.DagActionLeaseObject
+ updatedLeaseObject = new
DagActionStore.DagActionLeaseObject(updatedDagAction, false,
selectInfoResult.getEventTimeMillis());
+ boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
+ updatedLeaseObject,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+ Assert.assertTrue(markedSuccess);
+ return selectInfoResult;
+ }
}