This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b254b6bb [GOBBLIN-1926] Fix Reminder Event Epsilon Comparison (#3797)
9b254b6bb is described below

commit 9b254b6bb4322a0bdb048f19daf71022ddcccd7e
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 10 14:50:35 2023 -0700

    [GOBBLIN-1926] Fix Reminder Event Epsilon Comparison (#3797)
    
    * Fix Reminder Event Epsilon Comparison
    
    * Add TODO comment
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java      | 18 ++++++++++++++----
 .../runtime/api/MysqlMultiActiveLeaseArbiterTest.java  | 13 +++++++++----
 2 files changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 82950ef7f..4c2e8d2da 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -151,7 +151,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = 
"SELECT "
       + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as 
utc_event_timestamp, "
       + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, 
'+00:00') as utc_lease_acquisition_timestamp, "
-      + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 
1000 <= epsilon as is_within_epsilon, CASE "
+      + "TIMESTAMPDIFF(microsecond, event_timestamp, CONVERT_TZ(?, '+00:00', 
@@session.time_zone)) / 1000 <= epsilon as is_within_epsilon, CASE "
       + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
       + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
       + "ELSE 3 END as lease_validity_status, linger, "
@@ -269,7 +269,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     log.info("Multi-active scheduler about to handle trigger event: [{}, is: 
{}, triggerEventTimestamp: {}]",
         flowAction, isReminderEvent ? "reminder" : "original", 
eventTimeMillis);
     // Query lease arbiter table about this flow action
-    Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, 
isReminderEvent);
+    Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, 
isReminderEvent, eventTimeMillis);
 
     // TODO: change all the `CASE N: ...` statements back to debug statements 
after uncovering issue
     try {
@@ -299,11 +299,18 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           return new NoLongerLeasingStatus();
         }
         if (eventTimeMillis > dbEventTimestamp.getTime()) {
+          // TODO: emit metric here to capture this unexpected behavior
           log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Severe constraint "
                   + "violation encountered: a reminder event newer than db 
event was found when db laundering should "
                   + "ensure monotonically increasing laundered event times.", 
flowAction,
               isReminderEvent ? "reminder" : "original", eventTimeMillis, 
dbEventTimestamp.getTime());
         }
+        if (eventTimeMillis == dbEventTimestamp.getTime()) {
+          // TODO: change this to a debug after fixing issue
+          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Reminder event time"
+                  + "is the same as db event.", flowAction, isReminderEvent ? 
"reminder" : "original",
+              eventTimeMillis, dbEventTimestamp);
+        }
       }
 
       log.info("Multi-active arbiter replacing local trigger event timestamp 
[{}, is: {}, triggerEventTimestamp: {}] "
@@ -359,10 +366,13 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * Checks leaseArbiterTable for an existing entry for this flow action and 
event time
    */
   protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagAction flowAction,
-      boolean isReminderEvent) throws IOException {
+      boolean isReminderEvent, long eventTimeMillis) throws IOException {
     return withPreparedStatement(isReminderEvent ? 
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
         getInfoStatement -> {
           int i = 0;
+          if (isReminderEvent) {
+            getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis), 
UTC_CAL.get());
+          }
           getInfoStatement.setString(++i, flowAction.getFlowGroup());
           getInfoStatement.setString(++i, flowAction.getFlowName());
           getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
@@ -511,7 +521,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       return new LeaseObtainedStatus(flowAction, 
selectInfoResult.eventTimeMillis,
           selectInfoResult.getLeaseAcquisitionTimeMillis().get());
     }
-    log.info("Another participant acquired lease in between for [{}, is: {}, 
eventTimestamp: {}] - num rows updated: ",
+    log.info("Another participant acquired lease in between for [{}, is: {}, 
eventTimestamp: {}] - num rows updated: {}",
         flowAction, isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
     return new LeasedToAnotherStatus(flowAction, 
selectInfoResult.getEventTimeMillis(),
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 8f1fdf30c..15090e8f1 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -36,6 +36,7 @@ import static 
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
 @Slf4j
 public class MysqlMultiActiveLeaseArbiterTest {
   private static final int EPSILON = 10000;
+  private static final int MORE_THAN_EPSILON = (int) (EPSILON * 1.1);
   private static final int LINGER = 50000;
   private static final String USER = "testUser";
   private static final String PASSWORD = "testPassword";
@@ -117,7 +118,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Tests CASE 3 of trying to acquire a lease for a distinct flow action 
event, while the previous event's lease is
     // valid
     // Allow enough time to pass for this trigger to be considered distinct, 
but not enough time so the lease expires
-    Thread.sleep(EPSILON * 3/2);
+    Thread.sleep(MORE_THAN_EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(thirdLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
@@ -147,7 +148,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
-    Thread.sleep(EPSILON * 3/2);
+    Thread.sleep(MORE_THAN_EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(sixthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
@@ -286,10 +287,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
 
    /*
   Tests calling `tryAcquireLease` for a reminder event whose lease has 
completed in the database and should return
-  `NoLongerLeasing` status
+  `NoLongerLeasing` status.
+  Note: that we wait for enough time to pass that the event would have been 
considered distinct for a non-reminder case
+  to ensure that the comparison made for reminder events is against the 
preserved event time not current time in db
    */
    @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
-   public void testReminderEventAcquireLeaseOnCompletedLease() throws 
IOException {
+   public void testReminderEventAcquireLeaseOnCompletedLease() throws 
IOException, InterruptedException {
      // Mark the resume action lease from above as completed by fabricating a 
LeaseObtainedStatus
      MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
          mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
@@ -297,6 +300,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
          resumeDagAction, selectInfoResult.getEventTimeMillis(), 
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
      Assert.assertTrue(markedSuccess);
 
+     // Sleep enough time for the event to have been considered distinct
+     Thread.sleep(MORE_THAN_EPSILON);
      // Now have a reminder event check-in on the completed lease
      LeaseAttemptStatus attemptStatus =
          mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true);

Reply via email to