This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 083913de9 [GOBBLIN-2070] Add eventTimeMillis field to
leaseAttemptStatus for adhoc flows where… (#3951)
083913de9 is described below
commit 083913de95bfce14cd3147f04ea79d5dee0c0f32
Author: umustafi <[email protected]>
AuthorDate: Tue May 21 13:58:20 2024 -0700
[GOBBLIN-2070] Add eventTimeMillis field to leaseAttemptStatus for adhoc
flows where… (#3951)
* Add eventTimeMillis field to leaseAttemptStatus for adhoc flows where
flowExecutionId is different than the event time of the lease
* Update tests to capture edge case
---
.../modules/orchestration/FlowLaunchHandler.java | 2 +-
.../modules/orchestration/LeaseAttemptStatus.java | 32 ++++++++--------------
.../MysqlMultiActiveLeaseArbiter.java | 10 ++++---
.../DagManagementTaskStreamImplTest.java | 4 +--
.../orchestration/FlowLaunchHandlerTest.java | 7 +++--
.../MysqlMultiActiveLeaseArbiterTest.java | 18 ++++++++----
6 files changed, 36 insertions(+), 37 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 1c7c82358..1c37a404c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -126,7 +126,7 @@ public class FlowLaunchHandler {
} else if (leaseAttempt instanceof
LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus)
leaseAttempt);
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus)
{ // remind w/o delay to immediately re-attempt handling
- return Optional.of(new
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(),
0L));
+ return Optional.of(new
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(),
((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis(),
0L));
} else {
throw new RuntimeException("unexpected `LeaseAttemptStatus` derived
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index f77427892..1087c6be4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -51,26 +51,23 @@ public abstract class LeaseAttemptStatus {
public static class NoLongerLeasingStatus extends LeaseAttemptStatus {}
/*
- The participant calling this method acquired the lease for the event in
question. `Dag action`'s flow execution id
- is the timestamp associated with the lease and the time the caller obtained
the lease is stored within the
- `leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference
is used to recordLeaseSuccess for the
- current LeaseObtainedStatus via the completeLease method from a caller
without access to the {@link MultiActiveLeaseArbiter}.
+ The participant calling this method acquired the lease for the event in
question.
+ The timestamp associated with the lease is stored in `eventTimeMillis` field
and the time the caller obtained the
+ lease is stored within the`leaseAcquisitionTimestamp` field. Note that the
`Dag action` returned by the lease
+ arbitration attempt will be unchanged for flows that do not adopt the
consensus eventTimeMillis as the flow execution
+ id, so a separate field must be maintained to track the eventTimeMillis for
lease completion. The
+ `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the
current LeaseObtainedStatus via the
+ completeLease method from a caller without access to the {@link
MultiActiveLeaseArbiter}.
*/
@Data
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction consensusDagAction;
+ private final long eventTimeMillis;
private final long leaseAcquisitionTimestamp;
private final long minimumLingerDurationMillis;
@Getter(AccessLevel.NONE)
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
- /**
- * @return event time in millis since epoch for the event of this lease
acquisition
- */
- public long getEventTimeMillis() {
- return Long.parseLong(consensusDagAction.getFlowExecutionId());
- }
-
/**
* Completes the lease referenced by this status object if it has not
expired.
* @return true if able to complete lease, false otherwise.
@@ -83,22 +80,15 @@ public abstract class LeaseAttemptStatus {
/*
This dag action event already has a valid lease owned by another participant.
- `Dag action`'s flow execution id is the timestamp the lease is associated
with, however the dag action event it
- corresponds to may be a different and distinct occurrence of the same event.
+ `eventTimeMillis' corresponds to the timestamp of the existing lease
associated with this dag action, however the dag
+ action event it corresponds to may be a different and distinct occurrence of
the same event.
`minimumLingerDurationMillis` is the minimum amount of time to wait before
this participant should return to check if
the lease has completed or expired
*/
@Data
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction consensusDagAction;
+ private final long eventTimeMillis;
private final long minimumLingerDurationMillis;
-
- /**
- * Returns event time in millis since epoch for the event whose lease was
obtained by another participant.
- * @return
- */
- public long getEventTimeMillis() {
- return Long.parseLong(consensusDagAction.getFlowExecutionId());
- }
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 4479358ec..2702a81d5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -307,7 +307,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
+ return new
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbEventTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedDagAction =
@@ -315,7 +315,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
- return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbCurrentTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
@@ -514,12 +514,14 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedDagAction,
isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
- return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(),
minimumLingerDurationMillis, this);
+ return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction,
selectInfoResult.eventTimeMillis,
+ selectInfoResult.getLeaseAcquisitionTimeMillis().get(),
minimumLingerDurationMillis, this);
}
log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: {}",
updatedDagAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
- return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
minimumLingerDurationMillis);
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
selectInfoResult.eventTimeMillis,
+ minimumLingerDurationMillis);
}
/**
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 607d44389..f8c0f8296 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -110,8 +110,8 @@ public class DagManagementTaskStreamImplTest {
when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
.tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(),
anyBoolean(), anyBoolean()))
.thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
- new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15),
- new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 0, 5,
null));
+ new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 3, 15),
+ new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 5, 0, 5,
null));
DagTask dagTask = dagManagementTaskStream.next();
Assert.assertTrue(dagTask instanceof LaunchDagTask);
DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index dea34b6cd..b4feeed84 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -28,15 +28,16 @@ import
org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
public class FlowLaunchHandlerTest {
- long eventToRevisit = 123000L;
+ long flowExecutionId = 123000L;
+ long eventToRevisit = 641000L;
long minimumLingerDurationMillis = 2000L;
String cronExpression =
FlowLaunchHandler.createCronFromDelayPeriod(minimumLingerDurationMillis);
String cronExpressionSuffix =
truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction("flowName", "flowGroup",
- String.valueOf(eventToRevisit), "jobName",
DagActionStore.DagActionType.LAUNCH);
+ String.valueOf(flowExecutionId), "jobName",
DagActionStore.DagActionType.LAUNCH);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
- new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction,
minimumLingerDurationMillis);
+ new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit,
minimumLingerDurationMillis);
/**
* Remove first two fields from cron expression representing seconds and
minutes to return truncated cron expression
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 2699148b6..607d8f3b0 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -239,7 +239,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
- updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+ updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
+ selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction,
@@ -321,7 +322,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
- updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+ updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
+ selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER,
null));
Assert.assertTrue(markedSuccess);
// Sleep enough time for the event to have been considered distinct
@@ -333,9 +335,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
}
/*
- Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set
to True and verify that flowExecutionId
+ Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set
to False and verify that flowExecutionId
returned is the same as flowExecutionId provided to it for a
LeaseObtainedStatus and LeasedToAnotherStatus object
- (CASE 1 & 2).
+ (CASE 1 & 2). It also verifies that the `eventTimeMillis` stored in a lease
obtained status can be used to complete
+ the lease.
*/
@Test
public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
@@ -346,18 +349,21 @@ public class MysqlMultiActiveLeaseArbiterTest {
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() !=
Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId()));
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
// A second attempt to obtain a lease on the same action should return a
LeasedToAnotherStatus which also contains
- // the original flowExecutionId
+ // the original flowExecutionId and the same event time from the previous
LeaseAttemptStatus
LeaseAttemptStatus secondLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2,
eventTimeMillis, false, false);
Assert.assertTrue(secondLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
- Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
+
Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
+
+
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus));
}
}