This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9b254b6bb [GOBBLIN-1926] Fix Reminder Event Epsilon Comparison (#3797)
9b254b6bb is described below
commit 9b254b6bb4322a0bdb048f19daf71022ddcccd7e
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 10 14:50:35 2023 -0700
[GOBBLIN-1926] Fix Reminder Event Epsilon Comparison (#3797)
* Fix Reminder Event Epsilon Comparison
* Add TODO comment
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 18 ++++++++++++++----
.../runtime/api/MysqlMultiActiveLeaseArbiterTest.java | 13 +++++++++----
2 files changed, 23 insertions(+), 8 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 82950ef7f..4c2e8d2da 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -151,7 +151,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER =
"SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as
utc_event_timestamp, "
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone,
'+00:00') as utc_lease_acquisition_timestamp, "
- + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) /
1000 <= epsilon as is_within_epsilon, CASE "
+ + "TIMESTAMPDIFF(microsecond, event_timestamp, CONVERT_TZ(?, '+00:00',
@@session.time_zone)) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, "
@@ -269,7 +269,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.info("Multi-active scheduler about to handle trigger event: [{}, is:
{}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis);
// Query lease arbiter table about this flow action
- Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction,
isReminderEvent);
+ Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction,
isReminderEvent, eventTimeMillis);
// TODO: change all the `CASE N: ...` statements back to debug statements
after uncovering issue
try {
@@ -299,11 +299,18 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
return new NoLongerLeasingStatus();
}
if (eventTimeMillis > dbEventTimestamp.getTime()) {
+ // TODO: emit metric here to capture this unexpected behavior
log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Severe constraint "
+ "violation encountered: a reminder event newer than db
event was found when db laundering should "
+ "ensure monotonically increasing laundered event times.",
flowAction,
isReminderEvent ? "reminder" : "original", eventTimeMillis,
dbEventTimestamp.getTime());
}
+ if (eventTimeMillis == dbEventTimestamp.getTime()) {
+ // TODO: change this to a debug after fixing issue
+ log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Reminder event time"
+ + "is the same as db event.", flowAction, isReminderEvent ?
"reminder" : "original",
+ eventTimeMillis, dbEventTimestamp);
+ }
}
log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
@@ -359,10 +366,13 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* Checks leaseArbiterTable for an existing entry for this flow action and
event time
*/
protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagAction flowAction,
- boolean isReminderEvent) throws IOException {
+ boolean isReminderEvent, long eventTimeMillis) throws IOException {
return withPreparedStatement(isReminderEvent ?
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
+ if (isReminderEvent) {
+ getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis),
UTC_CAL.get());
+ }
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
@@ -511,7 +521,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
return new LeaseObtainedStatus(flowAction,
selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
- log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: ",
+ log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: {}",
flowAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction,
selectInfoResult.getEventTimeMillis(),
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 8f1fdf30c..15090e8f1 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -36,6 +36,7 @@ import static
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
private static final int EPSILON = 10000;
+ private static final int MORE_THAN_EPSILON = (int) (EPSILON * 1.1);
private static final int LINGER = 50000;
private static final String USER = "testUser";
private static final String PASSWORD = "testPassword";
@@ -117,7 +118,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Tests CASE 3 of trying to acquire a lease for a distinct flow action
event, while the previous event's lease is
// valid
// Allow enough time to pass for this trigger to be considered distinct,
but not enough time so the lease expires
- Thread.sleep(EPSILON * 3/2);
+ Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(thirdLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
@@ -147,7 +148,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be
acquired
- Thread.sleep(EPSILON * 3/2);
+ Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(sixthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
@@ -286,10 +287,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
/*
Tests calling `tryAcquireLease` for a reminder event whose lease has
completed in the database and should return
- `NoLongerLeasing` status
+ `NoLongerLeasing` status.
+ Note: that we wait for enough time to pass that the event would have been
considered distinct for a non-reminder case
+ to ensure that the comparison made for reminder events is against the
preserved event time not current time in db
*/
@Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
- public void testReminderEventAcquireLeaseOnCompletedLease() throws
IOException {
+ public void testReminderEventAcquireLeaseOnCompletedLease() throws
IOException, InterruptedException {
// Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
@@ -297,6 +300,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
+ // Sleep enough time for the event to have been considered distinct
+ Thread.sleep(MORE_THAN_EPSILON);
// Now have a reminder event check-in on the completed lease
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);