This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b72193ddf [GOBBLIN-2100] Add javadocs to DagAction related classes
(#3987)
b72193ddf is described below
commit b72193ddf53917d09e5d7c35b4b1ca009b9f8d42
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 27 15:20:02 2024 -0700
[GOBBLIN-2100] Add javadocs to DagAction related classes (#3987)
* Add javadocs to DagAction related classes
* Revise javadocs and naming
* Rename leaseObject to leaseParam in tests
---
.../orchestration/DagActionReminderScheduler.java | 35 ++++---
.../modules/orchestration/DagActionStore.java | 37 ++++++-
.../modules/orchestration/DagManagement.java | 2 +-
.../orchestration/DagManagementTaskStreamImpl.java | 48 ++++-----
.../modules/orchestration/FlowLaunchHandler.java | 9 +-
.../orchestration/InstrumentedLeaseArbiter.java | 12 +--
.../modules/orchestration/LeaseAttemptStatus.java | 16 +--
.../orchestration/MultiActiveLeaseArbiter.java | 4 +-
.../MysqlMultiActiveLeaseArbiter.java | 115 +++++++++++----------
.../modules/orchestration/Orchestrator.java | 4 +-
.../DagActionReminderSchedulerTest.java | 2 +-
.../DagManagementTaskStreamImplTest.java | 6 +-
.../orchestration/FlowLaunchHandlerTest.java | 4 +-
.../MysqlMultiActiveLeaseArbiterTest.java | 107 +++++++++----------
14 files changed, 215 insertions(+), 186 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 21a9bdb92..8c1d3a8af 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -72,18 +72,18 @@ public class DagActionReminderScheduler {
/**
* Uses a dagAction & reminder duration in milliseconds to create a
reminder job that will fire
* `reminderDurationMillis` after the current time
- * @param dagActionLeaseObject
+ * @param leaseParams
* @param reminderDurationMillis
* @throws SchedulerException
*/
- public void scheduleReminder(DagActionStore.DagActionLeaseObject
dagActionLeaseObject, long reminderDurationMillis,
+ public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long
reminderDurationMillis,
boolean isDeadlineReminder)
throws SchedulerException {
- JobDetail jobDetail = createReminderJobDetail(dagActionLeaseObject,
isDeadlineReminder);
- Trigger trigger =
createReminderJobTrigger(dagActionLeaseObject.getDagAction(),
reminderDurationMillis,
+ JobDetail jobDetail = createReminderJobDetail(leaseParams,
isDeadlineReminder);
+ Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(),
reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
log.info("Reminder set for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
- dagActionLeaseObject.getDagAction(), reminderDurationMillis,
isDeadlineReminder);
+ leaseParams.getDagAction(), reminderDurationMillis,
isDeadlineReminder);
quartzScheduler.scheduleJob(jobDetail, trigger);
}
@@ -113,16 +113,16 @@ public class DagActionReminderScheduler {
DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
- DagActionStore.DagActionLeaseObject reminderDagActionLeaseObject = new
DagActionStore.DagActionLeaseObject(
+ DagActionStore.LeaseParams reminderLeaseParams = new
DagActionStore.LeaseParams(
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType),
true, eventTimeMillis);
- log.info("DagProc reminder triggered for dagAction event: {}",
reminderDagActionLeaseObject);
+ log.info("DagProc reminder triggered for dagAction event: {}",
reminderLeaseParams);
try {
DagManagement dagManagement =
GobblinServiceManager.getClass(DagManagement.class);
- dagManagement.addReminderDagAction(reminderDagActionLeaseObject);
+ dagManagement.addReminderDagAction(reminderLeaseParams);
} catch (IOException e) {
- log.error("Failed to add DagAction event to DagManagement. dagAction
event: {}", reminderDagActionLeaseObject);
+ log.error("Failed to add DagAction event to DagManagement. dagAction
event: {}", reminderLeaseParams);
}
}
}
@@ -152,17 +152,18 @@ public class DagActionReminderScheduler {
* by a key comprised of the dagAction's fields. boolean isDeadlineReminder
is flag that tells if this createReminder
* requests are for deadline dag actions that are setting reminder for
deadline duration.
*/
- public static JobDetail
createReminderJobDetail(DagActionStore.DagActionLeaseObject
dagActionLeaseObject, boolean isDeadlineReminder) {
+ public static JobDetail createReminderJobDetail(DagActionStore.LeaseParams
leaseParams, boolean isDeadlineReminder) {
JobDataMap dataMap = new JobDataMap();
- dataMap.put(ConfigurationKeys.FLOW_NAME_KEY,
dagActionLeaseObject.getDagAction().getFlowName());
- dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY,
dagActionLeaseObject.getDagAction().getFlowGroup());
- dataMap.put(ConfigurationKeys.JOB_NAME_KEY,
dagActionLeaseObject.getDagAction().getJobName());
- dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagActionLeaseObject.getDagAction().getFlowExecutionId());
- dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY,
dagActionLeaseObject.getDagAction().getDagActionType());
- dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY,
dagActionLeaseObject.getEventTimeMillis());
+ DagActionStore.DagAction dagAction = leaseParams.getDagAction();
+ dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
+ dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
+ dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
+ dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY,
dagAction.getDagActionType());
+ dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY,
leaseParams.getEventTimeMillis());
return JobBuilder.newJob(ReminderJob.class)
- .withIdentity(createJobKey(dagActionLeaseObject.getDagAction(),
isDeadlineReminder))
+ .withIdentity(createJobKey(leaseParams.getDagAction(),
isDeadlineReminder))
.usingJobData(dataMap)
.build();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 57b40a1d6..4bc7e639c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -28,6 +28,10 @@ import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+/**
+ * GaaS store for pending {@link DagAction}s on a flow or job.
+ * See javadoc for {@link DagAction}
+ */
public interface DagActionStore {
public static final String NO_JOB_NAME_DEFAULT = "";
enum DagActionType {
@@ -39,6 +43,18 @@ public interface DagActionStore {
RESUME, // Resume flow invoked through API call
}
+ /**
+ * A DagAction uniquely identifies a particular flow (or job level)
execution and the action to be performed on it,
+ * denoted by the `dagActionType` field.
+ * These are created and stored either by a REST client request or generated
within GaaS. The Flow Management layer
+ * retrieves and executes {@link DagAction}s to progress a flow's execution
or enforce execution deadlines.
+ *
+ * Flow group, name, and executionId are sufficient to define a flow level
action (used with
+ * {@link DagActionStore#NO_JOB_NAME_DEFAULT}). When `jobName` is provided,
it can be used to identify the specific
+ * job on which the action is to be performed. The schema of this class
matches exactly that of the
+ * {@link DagActionStore}.
+ *
+ */
@Data
@RequiredArgsConstructor
class DagAction {
@@ -78,9 +94,15 @@ public interface DagActionStore {
}
}
+ /**
+ * This object is used locally (in-memory) by the {@link
MultiActiveLeaseArbiter} to identify a particular
+ * {@link DagAction} along with the time it was requested, denoted by the
`eventTimeMillis` field. It also tracks
+ * whether it has been previously passed to the {@link
MultiActiveLeaseArbiter} to attempt ownership over the flow
+ * event, indicated by the 'isReminder' field (true when it has been
previously attempted).
+ */
@Data
@RequiredArgsConstructor
- class DagActionLeaseObject {
+ class LeaseParams {
final DagAction dagAction;
final boolean isReminder;
final long eventTimeMillis;
@@ -88,10 +110,15 @@ public interface DagActionStore {
/**
* Creates a lease object for a dagAction and eventTimeMillis representing
an original event (isReminder is False)
*/
- public DagActionLeaseObject(DagAction dagAction, long eventTimeMillis) {
- this.dagAction = dagAction;
- this.isReminder = false;
- this.eventTimeMillis = eventTimeMillis;
+ public LeaseParams(DagAction dagAction, long eventTimeMillis) {
+ this(dagAction, false, eventTimeMillis);
+ }
+
+ /**
+ * Replace flow execution id in dagAction with agreed upon event time to
easily track the flow
+ */
+ public DagAction updateDagActionFlowExecutionId(long flowExecutionId) {
+ return this.dagAction.updateFlowExecutionId(flowExecutionId);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index eea645284..caa5e96ac 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -35,5 +35,5 @@ public interface DagManagement {
/**
* Used to add reminder dagActions to the queue that already contain an
eventTimestamp from the previous lease attempt
*/
- void addReminderDagAction(DagActionStore.DagActionLeaseObject
reminderDagActionLeaseObject) throws IOException;
+ void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams)
throws IOException;
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index b6901f58d..dc8967990 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -80,7 +80,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
private final boolean isMultiActiveExecutionEnabled;
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
- private final BlockingQueue<DagActionStore.DagActionLeaseObject>
dagActionLeaseObjectQueue = new LinkedBlockingQueue<>();
+ private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue =
new LinkedBlockingQueue<>();
private final DagManagementStateStore dagManagementStateStore;
@Inject
@@ -112,18 +112,18 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
// TODO: Used to track missing dag issue, remove later as needed
log.info("Add original (non-reminder) dagAction {}", dagAction);
- if (!this.dagActionLeaseObjectQueue.offer(new
DagActionStore.DagActionLeaseObject(dagAction, false,
System.currentTimeMillis()))) {
+ if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction,
false, System.currentTimeMillis()))) {
throw new RuntimeException(String.format("Could not add dag action to
the queue %s", dagAction));
}
}
@Override
- public synchronized void
addReminderDagAction(DagActionStore.DagActionLeaseObject
reminderDagActionLeaseObject) {
+ public synchronized void addReminderDagAction(DagActionStore.LeaseParams
reminderLeaseParams) {
// TODO: Used to track missing dag issue, remove later as needed
- log.info("Add reminder dagAction {}", reminderDagActionLeaseObject);
+ log.info("Add reminder dagAction {}", reminderLeaseParams);
- if (!this.dagActionLeaseObjectQueue.offer(reminderDagActionLeaseObject)) {
- throw new RuntimeException(String.format("Could not add reminder dag
action to the queue %s", reminderDagActionLeaseObject));
+ if (!this.leaseParamsQueue.offer(reminderLeaseParams)) {
+ throw new RuntimeException(String.format("Could not add reminder dag
action to the queue %s", reminderLeaseParams));
}
}
@@ -137,17 +137,17 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
while (true) {
DagActionStore.DagAction dagAction = null;
try {
- DagActionStore.DagActionLeaseObject dagActionLeaseObject =
this.dagActionLeaseObjectQueue.take();
- dagAction = dagActionLeaseObject.getDagAction();
+ DagActionStore.LeaseParams leaseParams =
this.leaseParamsQueue.take();
+ dagAction = leaseParams.getDagAction();
/* Create triggers for original (non-reminder) dag actions of type
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
Reminder triggers are used to inform hosts once the job start
deadline and flow finish deadline are passed;
then only is lease arbitration done to enforce the deadline
violation and fail the job or flow if needed */
- if (!dagActionLeaseObject.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
- createJobStartDeadlineTrigger(dagActionLeaseObject);
- } else if (!dagActionLeaseObject.isReminder() &&
dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
- createFlowFinishDeadlineTrigger(dagActionLeaseObject);
+ if (!leaseParams.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+ createJobStartDeadlineTrigger(leaseParams);
+ } else if (!leaseParams.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+ createFlowFinishDeadlineTrigger(leaseParams);
} else { // Handle original non-deadline dagActions as well as
reminder events of all types
- LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagActionLeaseObject);
+ LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(leaseParams);
if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
}
@@ -159,22 +159,22 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
}
}
- private void
createJobStartDeadlineTrigger(DagActionStore.DagActionLeaseObject
dagActionLeaseObject)
+ private void createJobStartDeadlineTrigger(DagActionStore.LeaseParams
leaseParams)
throws SchedulerException, IOException {
long timeOutForJobStart =
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
-
dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ leaseParams.getDagAction().getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
// todo - this timestamp is just an approximation, the real job submission
has happened in past, and that is when a
// ENFORCE_JOB_START_DEADLINE dag action was created; we are just
processing that dag action here
long jobSubmissionTime = System.currentTimeMillis();
long reminderDuration = jobSubmissionTime + timeOutForJobStart -
System.currentTimeMillis();
- dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject,
reminderDuration, true);
+ dagActionReminderScheduler.get().scheduleReminder(leaseParams,
reminderDuration, true);
}
- private void
createFlowFinishDeadlineTrigger(DagActionStore.DagActionLeaseObject
dagActionLeaseObject)
+ private void createFlowFinishDeadlineTrigger(DagActionStore.LeaseParams
leaseParams)
throws SchedulerException, IOException {
long timeOutForJobFinish;
- Dag.DagNode<JobExecutionPlan> dagNode =
this.dagManagementStateStore.getDag(dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0);
+ Dag.DagNode<JobExecutionPlan> dagNode =
this.dagManagementStateStore.getDag(leaseParams.getDagAction().getDagId()).get().getNodes().get(0);
try {
timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
@@ -189,23 +189,23 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
long reminderDuration = flowStartTime + timeOutForJobFinish -
System.currentTimeMillis();
- dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject,
reminderDuration, true);
+ dagActionReminderScheduler.get().scheduleReminder(leaseParams,
reminderDuration, true);
}
/**
* Returns a {@link LeaseAttemptStatus} associated with the
* `dagAction` by calling
- * {@link
MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagActionLeaseObject,
boolean)}.
- * @param dagActionLeaseObject
+ * {@link
MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.LeaseParams, boolean)}.
+ * @param leaseParams
* @return
* @throws IOException
* @throws SchedulerException
*/
- private LeaseAttemptStatus
retrieveLeaseStatus(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
+ private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.LeaseParams
leaseParams)
throws IOException, SchedulerException {
// Uses reminder flag to determine whether to use current time as event
time or previously saved event time
LeaseAttemptStatus leaseAttemptStatus =
this.dagActionProcessingLeaseArbiter
- .tryAcquireLease(dagActionLeaseObject, false);
+ .tryAcquireLease(leaseParams, false);
/* Schedule a reminder for the event unless the lease has been
completed to safeguard against the case where
even we, when we might become the lease owner still fail to complete
processing
*/
@@ -242,7 +242,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
*/
protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
throws SchedulerException {
-
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagActionLeaseObject(),
+
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusLeaseParams(),
leaseStatus.getMinimumLingerDurationMillis(), false);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index e548f16ad..8051db483 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -106,11 +106,10 @@ public class FlowLaunchHandler {
* the status of the attempt.
*/
public void handleFlowLaunchTriggerEvent(Properties jobProps,
- DagActionStore.DagActionLeaseObject dagActionLeaseObject, boolean
adoptConsensusFlowExecutionId)
+ DagActionStore.LeaseParams leaseParams, boolean
adoptConsensusFlowExecutionId)
throws IOException {
- long previousEventTimeMillis = dagActionLeaseObject.getEventTimeMillis();
- LeaseAttemptStatus leaseAttempt =
this.multiActiveLeaseArbiter.tryAcquireLease(
- dagActionLeaseObject, adoptConsensusFlowExecutionId);
+ long previousEventTimeMillis = leaseParams.getEventTimeMillis();
+ LeaseAttemptStatus leaseAttempt =
this.multiActiveLeaseArbiter.tryAcquireLease(leaseParams,
adoptConsensusFlowExecutionId);
if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
&& persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseAttempt.getConsensusDagAction(),
@@ -129,7 +128,7 @@ public class FlowLaunchHandler {
return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus)
leaseAttempt);
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus)
{ // remind w/o delay to immediately re-attempt handling
return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(
- ((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt).getConsensusDagActionLeaseObject(), 0L));
+ ((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt).getConsensusLeaseParams(), 0L));
} else {
throw new RuntimeException("unexpected `LeaseAttemptStatus` derived
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index ab8a9ee14..1c7ae2993 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -72,14 +72,14 @@ public class InstrumentedLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject,
boolean skipFlowExecutionIdReplacement) throws IOException {
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams
leaseParams, boolean skipFlowExecutionIdReplacement) throws IOException {
LeaseAttemptStatus leaseAttemptStatus =
- decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagActionLeaseObject,
skipFlowExecutionIdReplacement);
- log.info("Multi-active scheduler lease attempt for leaseObject: {}
received type of leaseAttemptStatus: [{}, "
- + "eventTimestamp: {}] ", dagActionLeaseObject,
leaseAttemptStatus.getClass().getName(),
- dagActionLeaseObject.getEventTimeMillis());
+ decoratedMultiActiveLeaseArbiter.tryAcquireLease(leaseParams,
skipFlowExecutionIdReplacement);
+ log.info("Multi-active arbiter attempt for: {} received type of
leaseAttemptStatus: [{}, "
+ + "eventTimestamp: {}] ", leaseParams,
leaseAttemptStatus.getClass().getName(),
+ leaseParams.getEventTimeMillis());
if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
- if (dagActionLeaseObject.isReminder()) {
+ if (leaseParams.isReminder()) {
this.leasesObtainedDueToReminderCount.mark();
}
this.leaseObtainedCount.inc();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index 3584f445f..4dd16457e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -34,10 +34,10 @@ import lombok.Getter;
*/
public abstract class LeaseAttemptStatus {
/**
- * @return the {@link DagActionStore.DagActionLeaseObject}, containing the
dagAction, eventTimeMillis of the event, and boolean
+ * @return the {@link DagActionStore.LeaseParams}, containing the dagAction,
eventTimeMillis of the event, and boolean
* indicating if it's a reminder event; {@see
MultiActiveLeaseArbiter#tryAcquireLease}
*/
- public DagActionStore.DagActionLeaseObject
getConsensusDagActionLeaseObject() {
+ public DagActionStore.LeaseParams getConsensusLeaseParams() {
return null;
}
@@ -73,7 +73,7 @@ public abstract class LeaseAttemptStatus {
// avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
@EqualsAndHashCode(callSuper=false)
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagActionLeaseObject
consensusDagActionLeaseObject;
+ private final DagActionStore.LeaseParams consensusLeaseParams;
private final long leaseAcquisitionTimestamp;
private final long minimumLingerDurationMillis;
@Getter(AccessLevel.NONE)
@@ -81,7 +81,7 @@ public abstract class LeaseAttemptStatus {
@Override
public DagActionStore.DagAction getConsensusDagAction() {
- return consensusDagActionLeaseObject.getDagAction();
+ return consensusLeaseParams.getDagAction();
}
/**
@@ -94,7 +94,7 @@ public abstract class LeaseAttemptStatus {
}
public long getEventTimeMillis() {
- return consensusDagActionLeaseObject.getEventTimeMillis();
+ return consensusLeaseParams.getEventTimeMillis();
}
}
@@ -109,16 +109,16 @@ public abstract class LeaseAttemptStatus {
// avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
@EqualsAndHashCode(callSuper=false)
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagActionLeaseObject
consensusDagActionLeaseObject;
+ private final DagActionStore.LeaseParams consensusLeaseParams;
private final long minimumLingerDurationMillis;
@Override
public DagActionStore.DagAction getConsensusDagAction() {
- return consensusDagActionLeaseObject.getDagAction();
+ return consensusLeaseParams.getDagAction();
}
public long getEventTimeMillis() {
- return consensusDagActionLeaseObject.getEventTimeMillis();
+ return consensusLeaseParams.getEventTimeMillis();
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index b49d2ae52..5965a6d07 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -49,7 +49,7 @@ public interface MultiActiveLeaseArbiter {
* added by the previous write). Based on the transaction results, it will
return {@link LeaseAttemptStatus} to
* determine the next action.
*
- * @param dagActionLeaseObject uniquely identifies the
flow, the present action upon it, the time the action
+ * @param leaseParams uniquely identifies the flow, the
present action upon it, the time the action
* was triggered, and if the dag action
event we're checking on is a reminder event
* @param adoptConsensusFlowExecutionId if true then replaces the dagAction
flowExecutionId returned in
* LeaseAttemptStatuses with the
consensus eventTime, accessed via
@@ -58,7 +58,7 @@ public interface MultiActiveLeaseArbiter {
* {@link DagActionStore.DagAction} with a possibly updated ("laundered")
flow execution id that MUST be used thereafter
* @throws IOException
*/
- LeaseAttemptStatus tryAcquireLease(DagActionStore.DagActionLeaseObject
dagActionLeaseObject, boolean adoptConsensusFlowExecutionId)
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams,
boolean adoptConsensusFlowExecutionId)
throws IOException;
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 33274e425..7aa3c42b9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -248,21 +248,22 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject,
boolean adoptConsensusFlowExecutionId) throws IOException {
- log.info("Multi-active arbiter about to handle trigger event: {}",
dagActionLeaseObject);
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams
leaseParams,
+ boolean adoptConsensusFlowExecutionId) throws IOException {
+ log.info("Multi-active arbiter about to handle trigger event: {}",
leaseParams);
// Query lease arbiter table about this dag action
- Optional<GetEventInfoResult> getResult =
getExistingEventInfo(dagActionLeaseObject);
+ Optional<GetEventInfoResult> getResult = getExistingEventInfo(leaseParams);
try {
if (!getResult.isPresent()) {
log.debug("tryAcquireLease for {} - CASE 1: no existing row for this
dag action, then go ahead and insert",
- dagActionLeaseObject);
- int numRowsUpdated =
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction(),
+ leaseParams);
+ int numRowsUpdated = attemptLeaseIfNewRow(leaseParams.getDagAction(),
ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
.initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random()
* DELAY_FOR_RETRY_RANGE_MILLIS)
.build());
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated,
dagActionLeaseObject.getDagAction(),
- Optional.empty(), dagActionLeaseObject.isReminder(),
adoptConsensusFlowExecutionId);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams,
Optional.empty(),
+ adoptConsensusFlowExecutionId);
}
// Extract values from result set
@@ -276,30 +277,30 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// For reminder event, we can stop early if the reminder eventTimeMillis
is older than the current event in the db
// because db laundering tells us that the currently worked on db event
is newer and will have its own reminders
- if (dagActionLeaseObject.isReminder()) {
- if (dagActionLeaseObject.getEventTimeMillis() <
dbEventTimestamp.getTime()) {
+ if (leaseParams.isReminder()) {
+ if (leaseParams.getEventTimeMillis() < dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new
event trigger "
- + "is being worked on, so this older reminder will be
dropped.", dagActionLeaseObject,
+ + "is being worked on, so this older reminder will be
dropped.", leaseParams,
dbEventTimestamp);
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
- if (dagActionLeaseObject.getEventTimeMillis() >
dbEventTimestamp.getTime()) {
+ if (leaseParams.getEventTimeMillis() > dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
log.warn("tryAcquireLease for {} - dbEventTimeMillis: {} - Severe
constraint "
+ "violation encountered: a reminder event newer than db
event was found when db laundering should "
- + "ensure monotonically increasing laundered event times.",
dagActionLeaseObject,
+ + "ensure monotonically increasing laundered event times.",
leaseParams,
dbEventTimestamp.getTime());
}
- if (dagActionLeaseObject.getEventTimeMillis() ==
dbEventTimestamp.getTime()) {
+ if (leaseParams.getEventTimeMillis() == dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - Reminder
event time "
- + "is the same as db event.", dagActionLeaseObject,
dbEventTimestamp);
+ + "is the same as db event.", leaseParams, dbEventTimestamp);
}
}
// TODO: check whether reminder event before replacing flowExecutionId
if (adoptConsensusFlowExecutionId) {
log.info("Multi-active arbiter replacing local trigger event timestamp
{} with database eventTimestamp {} (in "
- + "epoch-millis)", dagActionLeaseObject,
dbCurrentTimestamp.getTime());
+ + "epoch-millis)", leaseParams, dbCurrentTimestamp.getTime());
}
/* Note that we use `adoptConsensusFlowExecutionId` parameter's value to
determine whether we should use the db
laundered event timestamp as the flowExecutionId or maintain the
original one
@@ -309,54 +310,54 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
DagActionStore.DagAction updatedDagAction =
- adoptConsensusFlowExecutionId ?
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbEventTimestamp.getTime())
: dagActionLeaseObject.getDagAction();
- log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
+ adoptConsensusFlowExecutionId ?
leaseParams.updateDagActionFlowExecutionId(dbEventTimestamp.getTime()) :
leaseParams.getDagAction();
+ DagActionStore.LeaseParams updatedLeaseParams = new
DagActionStore.LeaseParams(updatedDagAction,
+ dbEventTimestamp.getTime());
+ log.debug("tryAcquireLease for [{}] - CASE 2: Same event, lease is
valid", updatedLeaseParams);
// Utilize db timestamp for reminder
- return new LeaseAttemptStatus.LeasedToAnotherStatus(
- new DagActionStore.DagActionLeaseObject(updatedDagAction,
dbEventTimestamp.getTime()),
+ return new
LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedDagAction =
- adoptConsensusFlowExecutionId ?
dagActionLeaseObject.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime())
: dagActionLeaseObject.getDagAction();
- log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
- updatedDagAction, dagActionLeaseObject.isReminder ? "reminder" :
"original", dbCurrentTimestamp.getTime());
+ adoptConsensusFlowExecutionId ?
leaseParams.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime())
: leaseParams.getDagAction();
+ DagActionStore.LeaseParams updatedLeaseParams = new
DagActionStore.LeaseParams(updatedDagAction,
+ dbCurrentTimestamp.getTime());
+ log.debug("tryAcquireLease for [{}] - CASE 3: Distinct event, lease is
valid", updatedLeaseParams);
// Utilize db lease acquisition timestamp for wait time and
currentTimestamp as the new eventTimestamp
- return new LeaseAttemptStatus.LeasedToAnotherStatus(
- new DagActionStore.DagActionLeaseObject(updatedDagAction,
dbCurrentTimestamp.getTime()),
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
4: Lease is out of date (regardless of "
- + "whether same or distinct event)",
dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
- if (isWithinEpsilon && !dagActionLeaseObject.isReminder) {
+ + "whether same or distinct event)", leaseParams.getDagAction(),
+ leaseParams.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ if (isWithinEpsilon && !leaseParams.isReminder) {
log.warn("Lease should not be out of date for the same trigger event
since epsilon << linger for "
+ "leaseObject.getDagAction() {}, db eventTimestamp {}, db
leaseAcquisitionTimestamp {}, linger {}",
- dagActionLeaseObject.getDagAction(), dbEventTimestamp,
dbLeaseAcquisitionTimestamp, dbLinger);
+ leaseParams.getDagAction(), dbEventTimestamp,
dbLeaseAcquisitionTimestamp, dbLinger);
}
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
- dagActionLeaseObject.getDagAction(), true,true, dbEventTimestamp,
+ leaseParams.getDagAction(), true,true, dbEventTimestamp,
dbLeaseAcquisitionTimestamp);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated,
dagActionLeaseObject.getDagAction(),
- Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder,
adoptConsensusFlowExecutionId);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams,
Optional.of(dbCurrentTimestamp),
+ adoptConsensusFlowExecutionId);
} // No longer leasing this event
if (isWithinEpsilon) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 5: Same event, no longer leasing event"
- + " in db", dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ + " in db", leaseParams.getDagAction(),
+ leaseParams.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
6: Distinct event, no longer leasing "
- + "event in db", dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ + "event in db", leaseParams.getDagAction(),
+ leaseParams.isReminder ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement,
- dagActionLeaseObject.getDagAction(), true, false, dbEventTimestamp,
+ leaseParams.getDagAction(), true, false, dbEventTimestamp,
null);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated,
dagActionLeaseObject.getDagAction(),
- Optional.of(dbCurrentTimestamp), dagActionLeaseObject.isReminder,
adoptConsensusFlowExecutionId);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams,
Optional.of(dbCurrentTimestamp),
+ adoptConsensusFlowExecutionId);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -365,19 +366,20 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
/**
* Checks leaseArbiterTable for an existing entry for this dag action and
event time
*/
- protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
+ protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.LeaseParams leaseParams)
throws IOException {
+ DagActionStore.DagAction dagAction = leaseParams.getDagAction();
return dbStatementExecutor.withPreparedStatement(
- dagActionLeaseObject.isReminder ? thisTableGetInfoStatementForReminder
: thisTableGetInfoStatement,
+ leaseParams.isReminder ? thisTableGetInfoStatementForReminder :
thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
- if (dagActionLeaseObject.isReminder) {
- getInfoStatement.setTimestamp(++i, new
Timestamp(dagActionLeaseObject.getEventTimeMillis()), UTC_CAL.get());
+ if (leaseParams.isReminder) {
+ getInfoStatement.setTimestamp(++i, new
Timestamp(leaseParams.getEventTimeMillis()), UTC_CAL.get());
}
- getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getFlowGroup());
- getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getFlowName());
- getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getJobName());
- getInfoStatement.setString(++i,
dagActionLeaseObject.getDagAction().getDagActionType().toString());
+ getInfoStatement.setString(++i, dagAction.getFlowGroup());
+ getInfoStatement.setString(++i, dagAction.getFlowName());
+ getInfoStatement.setString(++i, dagAction.getJobName());
+ getInfoStatement.setString(++i,
dagAction.getDagActionType().toString());
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
@@ -520,33 +522,32 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @throws IOException
*/
protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int
numRowsUpdated,
- DagActionStore.DagAction dagAction, Optional<Timestamp>
dbCurrentTimestamp, boolean isReminderEvent,
- boolean adoptConsensusFlowExecutionId)
+ DagActionStore.LeaseParams leaseParams, Optional<Timestamp>
dbCurrentTimestamp, boolean adoptConsensusFlowExecutionId)
throws SQLException, IOException {
// Fetch values in row after attempted insert
- SelectInfoResult selectInfoResult = getRowInfo(dagAction);
+ SelectInfoResult selectInfoResult = getRowInfo(leaseParams.dagAction);
// Another participant won the lease in between
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
DagActionStore.DagAction updatedDagAction =
- adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : dagAction;
- DagActionStore.DagActionLeaseObject consensusDagActionLeaseObject =
- new DagActionStore.DagActionLeaseObject(updatedDagAction,
selectInfoResult.getEventTimeMillis());
+ adoptConsensusFlowExecutionId ?
leaseParams.updateDagActionFlowExecutionId(selectInfoResult.eventTimeMillis) :
leaseParams.dagAction;
+ DagActionStore.LeaseParams consensusLeaseParams =
+ new DagActionStore.LeaseParams(updatedDagAction,
selectInfoResult.getEventTimeMillis());
// If no db current timestamp is present, then use the full db linger
value for duration
long minimumLingerDurationMillis = dbCurrentTimestamp.isPresent() ?
selectInfoResult.getLeaseAcquisitionTimeMillis().get() +
selectInfoResult.getDbLinger()
- dbCurrentTimestamp.get().getTime() :
selectInfoResult.getDbLinger();
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedDagAction,
- isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
- return new
LeaseAttemptStatus.LeaseObtainedStatus(consensusDagActionLeaseObject,
+ leaseParams.isReminder() ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
+ return new LeaseAttemptStatus.LeaseObtainedStatus(consensusLeaseParams,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(),
minimumLingerDurationMillis, this);
}
log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: {}",
- updatedDagAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
+ updatedDagAction, leaseParams.isReminder ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
- return new
LeaseAttemptStatus.LeasedToAnotherStatus(consensusDagActionLeaseObject,
minimumLingerDurationMillis);
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(consensusLeaseParams,
minimumLingerDurationMillis);
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 49793cfcf..8ff5e2daf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -221,8 +221,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
flowName,
FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
- DagActionStore.DagActionLeaseObject
- leaseObject = new
DagActionStore.DagActionLeaseObject(launchDagAction, isReminderEvent,
+ DagActionStore.LeaseParams
+ leaseObject = new DagActionStore.LeaseParams(launchDagAction,
isReminderEvent,
triggerTimestampMillis);
// `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps,
leaseObject, flowSpec.isScheduled());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index b2c1ff9a7..4754a5e2c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -63,7 +63,7 @@ public class DagActionReminderSchedulerTest {
@Test
public void testCreateReminderJobDetail() {
long expectedEventTimeMillis = 55L;
- JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(new
DagActionStore.DagActionLeaseObject(launchDagAction, false,
expectedEventTimeMillis), false);
+ JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(new
DagActionStore.LeaseParams(launchDagAction, false, expectedEventTimeMillis),
false);
Assert.assertEquals(jobDetail.getKey().toString(),
DagActionReminderScheduler.RetryReminderKeyGroup + "." + expectedKey);
JobDataMap dataMap = jobDetail.getJobDataMap();
Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index fcea36d7f..a5167ca85 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -97,10 +97,10 @@ public class DagManagementTaskStreamImplTest {
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
- .tryAcquireLease(any(DagActionStore.DagActionLeaseObject.class),
anyBoolean()))
+ .tryAcquireLease(any(DagActionStore.LeaseParams.class), anyBoolean()))
.thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
- new LeaseAttemptStatus.LeasedToAnotherStatus(new
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 15),
- new LeaseAttemptStatus.LeaseObtainedStatus(new
DagActionStore.DagActionLeaseObject(launchAction, true, 1), 0, 5, null));
+ new LeaseAttemptStatus.LeasedToAnotherStatus(new
DagActionStore.LeaseParams(launchAction, true, 1), 15),
+ new LeaseAttemptStatus.LeaseObtainedStatus(new
DagActionStore.LeaseParams(launchAction, true, 1), 0, 5, null));
DagTask dagTask = dagManagementTaskStream.next();
Assert.assertTrue(dagTask instanceof LaunchDagTask);
DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 88361e563..47b7c4226 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -37,8 +37,8 @@ public class FlowLaunchHandlerTest {
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction("flowName", "flowGroup",
flowExecutionId, "jobName", DagActionStore.DagActionType.LAUNCH);
- DagActionStore.DagActionLeaseObject
- leaseObject = new DagActionStore.DagActionLeaseObject(dagAction, false,
eventToRevisit);
+ DagActionStore.LeaseParams
+ leaseObject = new DagActionStore.LeaseParams(dagAction, false,
eventToRevisit);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
new LeaseAttemptStatus.LeasedToAnotherStatus(leaseObject,
minimumLingerDurationMillis);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 0f97a290d..297148c96 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -58,16 +58,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Dag actions with the same flow info but different flow action types are
considered unique
private static DagActionStore.DagAction launchDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
- private static DagActionStore.DagActionLeaseObject
- launchLeaseObject = new
DagActionStore.DagActionLeaseObject(launchDagAction, false, eventTimeMillis);
+ private static DagActionStore.LeaseParams
+ launchLeaseParams = new DagActionStore.LeaseParams(launchDagAction,
false, eventTimeMillis);
private static DagActionStore.DagAction resumeDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.RESUME);
- private static DagActionStore.DagActionLeaseObject
- resumeLeaseObject = new
DagActionStore.DagActionLeaseObject(resumeDagAction, false, eventTimeMillis);
+ private static DagActionStore.LeaseParams
+ resumeLeaseParams = new DagActionStore.LeaseParams(resumeDagAction,
false, eventTimeMillis);
private static DagActionStore.DagAction launchDagAction2 =
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
- private static DagActionStore.DagActionLeaseObject
- launchLeaseObject2 = new
DagActionStore.DagActionLeaseObject(launchDagAction2, false, eventTimeMillis);
+ private static DagActionStore.LeaseParams
+ launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2,
false, eventTimeMillis);
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private ITestMetastoreDatabase testDb;
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -111,7 +111,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
@@ -124,14 +124,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
new DagActionStore.DagAction(flowGroup, flowName,
consensusEventTimeMillis, jobName,
DagActionStore.DagActionType.LAUNCH)));
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
consensusEventTimeMillis);
-
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder,
false);
+
Assert.assertEquals(firstObtainedStatus.getConsensusLeaseParams().isReminder,
false);
// Verify that different DagAction types for the same flow can have leases
at the same time
DagActionStore.DagAction killDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.KILL);
LeaseAttemptStatus killStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(
- new DagActionStore.DagActionLeaseObject(killDagAction, false,
eventTimeMillis), true);
+ new DagActionStore.LeaseParams(killDagAction, false,
eventTimeMillis), true);
Assert.assertTrue(killStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus killObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) killStatus;
@@ -142,7 +142,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Very little time should have passed if this test directly follows the
one above so this call will be considered
// the same as the previous event
LeaseAttemptStatus secondLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
Assert.assertTrue(secondLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
@@ -154,7 +154,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Allow enough time to pass for this trigger to be considered distinct,
but not enough time so the lease expires
Thread.sleep(MORE_THAN_EPSILON);
LeaseAttemptStatus thirdLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
Assert.assertTrue(thirdLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -164,7 +164,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Tests CASE 4 of lease out of date
Thread.sleep(LINGER);
LeaseAttemptStatus fourthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
Assert.assertTrue(fourthLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus fourthObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) fourthLaunchStatus;
@@ -177,14 +177,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
LeaseAttemptStatus fifthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
Assert.assertTrue(fifthLaunchStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be
acquired
Thread.sleep(MORE_THAN_EPSILON);
LeaseAttemptStatus sixthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
Assert.assertTrue(sixthLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus sixthObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) sixthLaunchStatus;
@@ -251,10 +251,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfMatchingAllColsStatement")
public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
throws IOException, SQLException {
- MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
completeLeaseHelper(resumeLeaseObject);
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
completeLeaseHelper(resumeLeaseParams);
// Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
- mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction,
- Optional.empty(), false, true);
+ mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeLeaseParams,
+ Optional.empty(), true);
// The following insert will fail since eventTimestamp does not match the
expected
int numRowsUpdated =
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
@@ -274,16 +274,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
*/
@Test
public void testOlderReminderEventAcquireLease() throws IOException {
- DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
+ DagActionStore.LeaseParams newLaunchLeaseParams =
getUniqueLaunchLeaseParams();
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
// Read database to obtain existing db eventTimeMillis and use it to
construct an older event
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
+
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseParams.getDagAction());
long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
- DagActionStore.DagActionLeaseObject updatedLeaseObject =
- new
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true,
olderEventTimestamp);
+ DagActionStore.LeaseParams updatedLeaseParams =
+ new DagActionStore.LeaseParams(newLaunchLeaseParams.getDagAction(),
true, olderEventTimestamp);
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseParams, true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
}
@@ -294,15 +294,15 @@ public class MysqlMultiActiveLeaseArbiterTest {
*/
@Test
public void testReminderEventAcquireLeaseOnValidLease() throws IOException {
- DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
+ DagActionStore.LeaseParams newLaunchLeaseParams =
getUniqueLaunchLeaseParams();
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
- (LeaseAttemptStatus.LeaseObtainedStatus)
mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
- // Use the consensusLeaseObject containing the new eventTimeMillis for the
reminder event time
- DagActionStore.DagActionLeaseObject updatedLeaseObject =
- new
DagActionStore.DagActionLeaseObject(newLaunchLeaseObject.getDagAction(), true,
+ (LeaseAttemptStatus.LeaseObtainedStatus)
mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
+ // Use the consensusLeaseParams containing the new eventTimeMillis for the
reminder event time
+ DagActionStore.LeaseParams updatedLeaseParams =
+ new DagActionStore.LeaseParams(newLaunchLeaseParams.getDagAction(),
true,
leaseObtainedStatus.getEventTimeMillis());
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject, true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseParams, true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) attemptStatus;
Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(),
leaseObtainedStatus.getEventTimeMillis());
@@ -314,13 +314,13 @@ public class MysqlMultiActiveLeaseArbiterTest {
*/
@Test
public void testReminderEventAcquireLeaseOnInvalidLease() throws
IOException, InterruptedException {
- DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
- MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseObject.getDagAction());
+ DagActionStore.LeaseParams newLaunchLeaseParams =
getUniqueLaunchLeaseParams();
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(newLaunchLeaseParams.getDagAction());
// Wait enough time for the lease to expire
Thread.sleep(MORE_THAN_LINGER);
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject,
true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams,
true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus obtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) attemptStatus;
Assert.assertTrue(obtainedStatus.getEventTimeMillis() >
selectInfoResult.getEventTimeMillis());
@@ -336,19 +336,19 @@ public class MysqlMultiActiveLeaseArbiterTest {
@Test
public void testReminderEventAcquireLeaseOnCompletedLease() throws
IOException, InterruptedException {
// Create a new dag action and complete the lease
- DagActionStore.DagActionLeaseObject newLaunchLeaseObject =
getUniqueLaunchLeaseObject();
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseObject, false);
- MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
completeLeaseHelper(newLaunchLeaseObject);
+ DagActionStore.LeaseParams newLaunchLeaseParams =
getUniqueLaunchLeaseParams();
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(newLaunchLeaseParams, false);
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
completeLeaseHelper(newLaunchLeaseParams);
// Sleep enough time for the event to have been considered distinct
Thread.sleep(MORE_THAN_EPSILON);
// Now have a reminder event check-in on the completed lease
- DagActionStore.DagAction updatedDagAction =
newLaunchLeaseObject.getDagAction().updateFlowExecutionId(
+ DagActionStore.DagAction updatedDagAction =
newLaunchLeaseParams.getDagAction().updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
- DagActionStore.DagActionLeaseObject updatedLeaseObject =
- new DagActionStore.DagActionLeaseObject(updatedDagAction, true,
newLaunchLeaseObject.getEventTimeMillis());
+ DagActionStore.LeaseParams updatedLeaseParams =
+ new DagActionStore.LeaseParams(updatedDagAction, true,
newLaunchLeaseParams.getEventTimeMillis());
LeaseAttemptStatus attemptStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseObject,
true);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(updatedLeaseParams,
true);
Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
}
@@ -362,7 +362,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
// Obtain a lease for a new action and verify its flowExecutionId is not
updated
LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2,
false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams2,
false);
Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
@@ -370,19 +370,19 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() !=
Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId()));
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
-
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
false);
+
Assert.assertEquals(firstObtainedStatus.getConsensusLeaseParams().isReminder(),
false);
// A second attempt to obtain a lease on the same action should return a
LeasedToAnotherStatus which also contains
// the original flowExecutionId and the same event time from the previous
LeaseAttemptStatus
LeaseAttemptStatus secondLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseObject2,
false);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams2,
false);
Assert.assertTrue(secondLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
-
Assert.assertEquals(firstObtainedStatus.getConsensusDagActionLeaseObject().isReminder(),
false);
+
Assert.assertEquals(firstObtainedStatus.getConsensusLeaseParams().isReminder(),
false);
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus));
}
@@ -402,10 +402,11 @@ public class MysqlMultiActiveLeaseArbiterTest {
}
/**
- * Returns a unique launch type leaseObject using
#getUniqueLaunchDagAction() to create a unique flowName
+ * Returns a unique launch type {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.LeaseParams}
+ * using #getUniqueLaunchDagAction() to create a unique flowName
*/
- public DagActionStore.DagActionLeaseObject getUniqueLaunchLeaseObject() {
- return new DagActionStore.DagActionLeaseObject(getUniqueLaunchDagAction(),
false, eventTimeMillis);
+ public DagActionStore.LeaseParams getUniqueLaunchLeaseParams() {
+ return new DagActionStore.LeaseParams(getUniqueLaunchDagAction(), false,
eventTimeMillis);
}
/**
@@ -413,15 +414,15 @@ public class MysqlMultiActiveLeaseArbiterTest {
* @return SelectInfoResult object containing the event information used to
complete the lease
*/
public MysqlMultiActiveLeaseArbiter.SelectInfoResult completeLeaseHelper(
- DagActionStore.DagActionLeaseObject previouslyLeasedLeaseObj) throws
IOException {
+ DagActionStore.LeaseParams previouslyLeasedLeaseParams) throws
IOException {
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-
mysqlMultiActiveLeaseArbiter.getRowInfo(previouslyLeasedLeaseObj.getDagAction());
- DagActionStore.DagAction updatedDagAction =
previouslyLeasedLeaseObj.getDagAction().updateFlowExecutionId(
+
mysqlMultiActiveLeaseArbiter.getRowInfo(previouslyLeasedLeaseParams.getDagAction());
+ DagActionStore.DagAction updatedDagAction =
previouslyLeasedLeaseParams.getDagAction().updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
- DagActionStore.DagActionLeaseObject
- updatedLeaseObject = new
DagActionStore.DagActionLeaseObject(updatedDagAction, false,
selectInfoResult.getEventTimeMillis());
+ DagActionStore.LeaseParams
+ updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction,
false, selectInfoResult.getEventTimeMillis());
boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
- updatedLeaseObject,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+ updatedLeaseParams,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
Assert.assertTrue(markedSuccess);
return selectInfoResult;
}