phet commented on code in PR #3800:
URL: https://github.com/apache/gobblin/pull/3800#discussion_r1362496618
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -514,20 +516,30 @@ protected LeaseAttemptStatus
evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
+ DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
- log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", flowAction,
+ log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
- return new LeaseObtainedStatus(flowAction,
selectInfoResult.eventTimeMillis,
+ return new LeaseObtainedStatus(updatedFlowAction,
selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: {}",
- flowAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
+ updatedFlowAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
- return new LeasedToAnotherStatus(flowAction,
selectInfoResult.getEventTimeMillis(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get() +
selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ?
dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}
+ /**
+ * Replace flow execution id with agreed upon event time to easily track
the flow
+ */
+ protected static DagActionStore.DagAction
updateFlowExecutionId(DagActionStore.DagAction flowAction,
Review Comment:
nit: should be a method of `DagAction`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -319,16 +319,18 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
+ DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
- flowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
+ updatedFlowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new LeasedToAnotherStatus(flowAction,
dbEventTimestamp.getTime(),
+ return new LeasedToAnotherStatus(updatedFlowAction,
dbEventTimestamp.getTime(),
Review Comment:
each of the flowExecId updates uses the timestamp value that is also passed
as the second arg to the `LeasedToAnotherStatus` or `LeaseObtainedStatus` ctor.
seems potentially unnecessary for that object still to maintain both--do we
want it to?
e.g. it could always implement:
```
long getEventTimestamp() {
return this.dagAction.getFlowExecutionId();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]