This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a2edfcb4 [GOBBLIN-1921] Properly handle reminder events (#3790)
2a2edfcb4 is described below

commit 2a2edfcb40c656c74865f58390d0eb64fb835f1d
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 3 11:50:55 2023 -0700

    [GOBBLIN-1921] Properly handle reminder events (#3790)
    
    * Add millisecond level precision to timestamp cols & proper timezone 
conversion
    
            - existing tests pass with minor modifications
    
    * Handle reminder events properly
    
    * Fix compilation errors & add isReminder flag
    
    * Add unit tests
    
    * Address review comments
    
    * Add newline to address comment
    
    * Include reminder/original tag in logging
    
    * Clarify timezone issues in comment
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |   1 +
 .../runtime/api/MultiActiveLeaseArbiter.java       |   9 +-
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 189 ++++++++++++++-------
 .../api/MysqlMultiActiveLeaseArbiterTest.java      | 123 +++++++++++---
 .../modules/orchestration/FlowTriggerHandler.java  |  10 +-
 .../modules/orchestration/Orchestrator.java        |   9 +-
 .../scheduler/GobblinServiceJobScheduler.java      |   4 +-
 .../modules/orchestration/OrchestratorTest.java    |   4 +-
 8 files changed, 242 insertions(+), 107 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 198c8de35..81608655a 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -109,6 +109,7 @@ public class ConfigurationKeys {
   // Event time of flow action to orchestrate using the multi-active lease 
arbiter
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = 
"orchestratorTriggerEventTimeMillis";
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = 
"-1";
+  public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
   public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
   public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
   // Note: linger should be on the order of seconds even though we measure in 
millis
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
index ab9e03599..faacb0995 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -24,7 +24,7 @@ import lombok.Data;
 
 /**
  * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
- * more active participants compete to take responsiblity for a particular 
flow's event. The type of flow event in
+ * more active participants compete to take responsibility for a particular 
flow's event. The type of flow event in
  * question does not impact the algorithm other than to uniquely identify the 
flow event. Each participant uses the
  * interface to initiate an attempt at ownership over the flow event and 
receives a response indicating the status of
  * the attempt.
@@ -38,7 +38,8 @@ import lombok.Data;
  *        b) LeasedToAnotherStatus -> another will attempt to carry out the 
required action before the lease expires
  *        c) NoLongerLeasingStatus -> flow event no longer needs to be acted 
upon (terminal state)
  *  3. If another participant has acquired the lease before this one could, 
then the present participant must check back
- *    in at the time of lease expiry to see if it needs to attempt the lease 
again [status (b) above].
+ *    in at the time of lease expiry to see if it needs to attempt the lease 
again [status (b) above]. We refer to this
+ *    check-in as a 'reminder event'.
  *  4. Once the participant which acquired the lease completes its work on the 
flow event, it calls recordLeaseSuccess
  *    to indicate to all other participants that the flow event no longer 
needs to be acted upon [status (c) above]
  */
@@ -51,10 +52,12 @@ public interface MultiActiveLeaseArbiter {
    * determine the next action.
    * @param flowAction uniquely identifies the flow and the present action 
upon it
    * @param eventTimeMillis is the time this flow action was triggered
+   * @param isReminderEvent true if the flow action event we're checking on is 
a reminder event
    * @return LeaseAttemptStatus
    * @throws IOException
    */
-  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis) throws IOException;
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis, boolean isReminderEvent)
+      throws IOException;
 
   /**
    * This method is used to indicate the owner of the lease has successfully 
completed required actions while holding
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 7d4bfb118..a7c035185 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -27,7 +27,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.Timestamp;
+import java.util.Calendar;
 import java.util.Optional;
+import java.util.TimeZone;
 import javax.sql.DataSource;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -46,10 +48,10 @@ import org.apache.gobblin.util.ConfigUtils;
  * schema is as follows:
  * [flow_group | flow_name | flow_action | event_timestamp | 
lease_acquisition_timestamp]
  * (----------------------primary key------------------------)
- * We also maintain another table in the database with two constants that 
allow us to coordinate between participants and
- * ensure they are using the same values to base their coordination off of.
+ * We also maintain another table in the database with two constants that 
allow us to coordinate between participants
+ * and ensure they are using the same values to base their coordination off of.
  * [epsilon | linger]
- * `epsilon` - time within we consider to timestamps to be the same, to 
account for between-host clock drift
+ * `epsilon` - time within we consider two event timestamps to be overlapping 
and can consolidate
  * `linger` - minimum time to occur before another host may attempt a lease on 
a flow event. It should be much greater
  *            than epsilon and encapsulate executor communication latency 
including retry attempts
  *
@@ -86,16 +88,30 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   private final int epsilon;
   private final int linger;
   private String thisTableGetInfoStatement;
+  private String thisTableGetInfoStatementForReminder;
   private String thisTableSelectAfterInsertStatement;
   private String thisTableAcquireLeaseIfMatchingAllStatement;
   private String thisTableAcquireLeaseIfFinishedStatement;
 
   // TODO: define retention on this table
+  /*
+    Notes:
+    - Set `event_timestamp` default value to turn off timestamp auto-updates 
for row modifications which alters this col
+      in an unexpected way upon completing the lease
+    - MySQL converts TIMESTAMP values from the current time zone to UTC for 
storage, and back from UTC to the current
+      time zone for retrieval. 
https://dev.mysql.com/doc/refman/8.0/en/datetime.html
+        - Thus, for reading any timestamps from MySQL we convert the timezone 
from session (default) to UTC to always
+          use epoch-millis in UTC locally
+        - Similarly, for inserting/updating any timestamps we convert the 
timezone from UTC to session (as it will be
+          (interpreted automatically as session time zone) and explicitly set 
all timestamp columns to avoid using the
+          default auto-update/initialization values
+    - We desire millisecond level precision and denote that with `(3)` for the 
TIMESTAMP types
+   */
   private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %s ("
       + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
       + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " 
flow_action varchar(100) NOT NULL, "
-      + "event_timestamp TIMESTAMP, "
-      + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, 
"
+      + "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), "
+      + "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, "
       + "PRIMARY KEY (flow_group,flow_name,flow_action))";
   private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
       + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY 
(primary_key))";
@@ -104,32 +120,50 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), 
linger=VALUES(linger)";
   protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_action=?";
   protected static final String WHERE_CLAUSE_TO_MATCH_ROW = 
WHERE_CLAUSE_TO_MATCH_KEY
-      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
-  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
-    + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+      + " AND event_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone)"
+      + " AND lease_acquisition_timestamp=CONVERT_TZ(?, '+00:00', 
@@session.time_zone)";
+  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT "
+      + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as 
utc_event_timestamp, "
+      + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, 
'+00:00') as utc_lease_acquisition_timestamp, "
+      + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
   // Does a cross join between the two tables to have epsilon and linger 
values available. Returns the following values:
-  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
current time in db is within epsilon of
-  // event_timestamp), leaseValidityStatus (1 if lease has not expired, 2 if 
expired, 3 if column is NULL or no longer
-  // leasing)
-  protected static final String GET_EVENT_INFO_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
-      + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000 
<= epsilon as is_within_epsilon, CASE "
-      + "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
-      + "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
-      + "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM 
%s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
new event timestamp (current timestamp in
+  // db) is within epsilon of event_timestamp in the table), 
leaseValidityStatus (1 if lease has not expired, 2 if
+  // expired, 3 if column is NULL or no longer leasing)
+  protected static final String GET_EVENT_INFO_STATEMENT = "SELECT "
+      + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as 
utc_event_timestamp, "
+      + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, 
'+00:00') as utc_lease_acquisition_timestamp, "
+      + "ABS(TIMESTAMPDIFF(microsecond, event_timestamp, 
CURRENT_TIMESTAMP(3))) / 1000 <= epsilon as is_within_epsilon, CASE "
+      + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
+      + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
+      + "ELSE 3 END as lease_validity_status, linger, "
+      + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+  // Same as query above, except that isWithinEpsilon is True if the reminder 
event timestamp (provided by caller) is
+  // OLDER than or equal to the db event_timestamp and within epsilon away 
from it.
+  protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = 
"SELECT "
+      + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as 
utc_event_timestamp, "
+      + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, 
'+00:00') as utc_lease_acquisition_timestamp, "
+      + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 
1000 <= epsilon as is_within_epsilon, CASE "
+      + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
+      + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
+      + "ELSE 3 END as lease_validity_status, linger, "
+      + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
   // Insert or update row to acquire lease if values have not changed since 
the previous read
   // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
   protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT 
INTO %s (flow_group, flow_name, "
-      + "flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, 
?, ?, CURRENT_TIMESTAMP, "
-      + "CURRENT_TIMESTAMP)";
+      + "flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, 
?, ?, CURRENT_TIMESTAMP(3), "
+      + "CURRENT_TIMESTAMP(3))";
   protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
-      + "SET event_timestamp=CURRENT_TIMESTAMP, 
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
-      + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND 
lease_acquisition_timestamp is NULL";
+      + "SET event_timestamp=CURRENT_TIMESTAMP(3), 
lease_acquisition_timestamp=CURRENT_TIMESTAMP(3) "
+      + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=CONVERT_TZ(?, 
'+00:00', @@session.time_zone) AND "
+      + "lease_acquisition_timestamp is NULL";
   protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
-      + "SET event_timestamp=CURRENT_TIMESTAMP, 
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+      + "SET event_timestamp=CURRENT_TIMESTAMP(3), 
lease_acquisition_timestamp=CURRENT_TIMESTAMP(3) "
       + WHERE_CLAUSE_TO_MATCH_ROW;
   // Complete lease acquisition if values have not changed since lease was 
acquired
   protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = 
"UPDATE %s SET "
       + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+  protected static final Calendar UTC_CAL = 
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
 
   @Inject
   public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
@@ -150,6 +184,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
     this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, 
this.leaseArbiterTableName,
         this.constantsTableName);
+    this.thisTableGetInfoStatementForReminder = 
String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
+        this.leaseArbiterTableName, this.constantsTableName);
     this.thisTableSelectAfterInsertStatement = 
String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
         this.constantsTableName);
     this.thisTableAcquireLeaseIfMatchingAllStatement =
@@ -186,19 +222,19 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   @Override
-  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
-      throws IOException {
-    log.info("Multi-active scheduler about to handle trigger event: [{}, 
triggerEventTimestamp: {}]", flowAction,
-        eventTimeMillis);
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis,
+      boolean isReminderEvent) throws IOException {
+    log.info("Multi-active scheduler about to handle trigger event: [{}, is: 
{}, triggerEventTimestamp: {}]",
+        flowAction, isReminderEvent ? "reminder" : "original", 
eventTimeMillis);
     // Query lease arbiter table about this flow action
-    Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);
+    Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, 
isReminderEvent);
 
     try {
       if (!getResult.isPresent()) {
-        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no 
existing row for this flow action, then go"
-                + " ahead and insert", flowAction, eventTimeMillis);
+        log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 
1: no existing row for this flow action,"
+            + " then go ahead and insert", flowAction, isReminderEvent ? 
"reminder" : "original", eventTimeMillis);
         int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
-       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.empty());
+       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.empty(), isReminderEvent);
       }
 
       // Extract values from result set
@@ -210,28 +246,47 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       int dbLinger = getResult.get().getDbLinger();
       Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
 
-      log.info("Multi-active arbiter replacing local trigger event timestamp 
[{}, triggerEventTimestamp: {}] with "
-          + "database eventTimestamp {}", flowAction, eventTimeMillis, 
dbCurrentTimestamp.getTime());
+      // For reminder event, we can stop early if the reminder eventTimeMillis 
is older than the current event in the db
+      // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
+      if (isReminderEvent) {
+        if (eventTimeMillis < dbEventTimestamp.getTime()) {
+          log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - A new event trigger "
+                  + "is being worked on, so this older reminder will be 
dropped.", flowAction,
+              isReminderEvent ? "reminder" : "original", eventTimeMillis, 
dbEventTimestamp);
+          return new NoLongerLeasingStatus();
+        }
+        if (eventTimeMillis > dbEventTimestamp.getTime()) {
+          log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Severe constraint "
+                  + "violation encountered: a reminder event newer than db 
event was found when db laundering should "
+                  + "ensure monotonically increasing laundered event times.", 
flowAction,
+              isReminderEvent ? "reminder" : "original", eventTimeMillis, 
dbEventTimestamp.getTime());
+        }
+      }
+
+      log.info("Multi-active arbiter replacing local trigger event timestamp 
[{}, is: {}, triggerEventTimestamp: {}] "
+          + "with database eventTimestamp {} (in epoch-millis)", flowAction, 
isReminderEvent ? "reminder" : "original",
+          eventTimeMillis, dbCurrentTimestamp.getTime());
 
       // Lease is valid
       if (leaseValidityStatus == 1) {
         if (isWithinEpsilon) {
-          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 2: 
Same event, lease is valid", flowAction,
-              dbCurrentTimestamp.getTime());
+          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
+              flowAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
           return new LeasedToAnotherStatus(flowAction, 
dbEventTimestamp.getTime(),
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
dbCurrentTimestamp.getTime());
         }
-        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 3: 
Distinct event, lease is valid", flowAction,
-            dbCurrentTimestamp.getTime());
+        log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
3: Distinct event, lease is valid",
+            flowAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
         // Utilize db lease acquisition timestamp for wait time
         return new LeasedToAnotherStatus(flowAction, 
dbCurrentTimestamp.getTime(),
             dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
dbCurrentTimestamp.getTime());
-      }
+      } // Lease is invalid
       else if (leaseValidityStatus == 2) {
-        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4: 
Lease is out of date (regardless of whether "
-            + "same or distinct event)", flowAction, 
dbCurrentTimestamp.getTime());
-        if (isWithinEpsilon) {
+        log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
4: Lease is out of date (regardless of "
+            + "whether same or distinct event)", flowAction, isReminderEvent ? 
"reminder" : "original",
+            dbCurrentTimestamp.getTime());
+        if (isWithinEpsilon && !isReminderEvent) {
           log.warn("Lease should not be out of date for the same trigger event 
since epsilon << linger for flowAction"
                   + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp 
{}, linger {}", flowAction,
               dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
@@ -239,19 +294,19 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement, 
flowAction,
             true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp), isReminderEvent);
       } // No longer leasing this event
         if (isWithinEpsilon) {
-          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: 
Same event, no longer leasing event in db: "
-              + "terminate", flowAction, dbCurrentTimestamp.getTime());
+          log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 5: Same event, no longer leasing event"
+              + " in db", flowAction, isReminderEvent ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
           return new NoLongerLeasingStatus();
         }
-        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: 
Distinct event, no longer leasing event in "
-            + "db", flowAction, dbCurrentTimestamp.getTime());
+        log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
6: Distinct event, no longer leasing "
+            + "event in db", flowAction, isReminderEvent ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
             true, false, dbEventTimestamp, null);
-        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp), isReminderEvent);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -260,8 +315,9 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   /**
    * Checks leaseArbiterTable for an existing entry for this flow action and 
event time
    */
-  protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException {
-    return withPreparedStatement(thisTableGetInfoStatement,
+  protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagAction flowAction,
+      boolean isReminderEvent) throws IOException {
+    return withPreparedStatement(isReminderEvent ? 
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
         getInfoStatement -> {
           int i = 0;
           getInfoStatement.setString(++i, flowAction.getFlowGroup());
@@ -284,12 +340,12 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
     try {
       // Extract values from result set
-      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
-      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      Timestamp dbEventTimestamp = 
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL);
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL);
       boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
       int leaseValidityStatus = resultSet.getInt("lease_validity_status");
       int dbLinger = resultSet.getInt("linger");
-      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("utc_current_timestamp", UTC_CAL);
       return new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
           dbLinger, dbCurrentTimestamp);
     } catch (SQLException e) {
@@ -337,8 +393,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
     return withPreparedStatement(acquireLeaseStatement,
         insertStatement -> {
-          completeUpdatePreparedStatement(insertStatement, flowAction, 
needEventTimeCheck,
-              needLeaseAcquisition, dbEventTimestamp, 
dbLeaseAcquisitionTimestamp);
+          completeUpdatePreparedStatement(insertStatement, flowAction, 
needEventTimeCheck, needLeaseAcquisition,
+              dbEventTimestamp, dbLeaseAcquisitionTimestamp);
           return insertStatement.executeUpdate();
         }, true);
   }
@@ -367,14 +423,15 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           throw new IOException("Expected resultSet containing row information 
for the lease that was attempted but "
               + "received nothing.");
         }
-        if (resultSet.getTimestamp(1) == null) {
+        if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) {
           throw new IOException("event_timestamp should never be null (it is 
always set to current timestamp)");
         }
-        long eventTimeMillis = resultSet.getTimestamp(1).getTime();
+        long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", 
UTC_CAL).getTime();
         // Lease acquisition timestamp is null if another participant has 
completed the lease
-        Optional<Long> leaseAcquisitionTimeMillis = resultSet.getTimestamp(2) 
== null ? Optional.empty() :
-            Optional.of(resultSet.getTimestamp(2).getTime());
-        int dbLinger = resultSet.getInt(3);
+        Optional<Long> leaseAcquisitionTimeMillis =
+            resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL) 
== null ? Optional.empty() :
+            
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", 
UTC_CAL).getTime());
+        int dbLinger = resultSet.getInt("linger");
         return new SelectInfoResult(eventTimeMillis, 
leaseAcquisitionTimeMillis, dbLinger);
       } catch (SQLException e) {
         throw new IOException(e);
@@ -397,7 +454,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @throws IOException
    */
   protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int 
numRowsUpdated,
-      DagActionStore.DagAction flowAction, Optional<Timestamp> 
dbCurrentTimestamp)
+      DagActionStore.DagAction flowAction, Optional<Timestamp> 
dbCurrentTimestamp, boolean isReminderEvent)
       throws SQLException, IOException {
     // Fetch values in row after attempted insert
     SelectInfoResult selectInfoResult = getRowInfo(flowAction);
@@ -406,13 +463,13 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       return new NoLongerLeasingStatus();
     }
     if (numRowsUpdated == 1) {
-      log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", 
flowAction,
-          selectInfoResult.eventTimeMillis);
+      log.debug("Obtained lease for [{}, is: {}, eventTimestamp: {}] 
successfully!", flowAction,
+          isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis);
       return new LeaseObtainedStatus(flowAction, 
selectInfoResult.eventTimeMillis,
           selectInfoResult.getLeaseAcquisitionTimeMillis().get());
     }
-    log.debug("Another participant acquired lease in between for [{}, 
eventTimestamp: {}] - num rows updated: ",
-        flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
+    log.debug("Another participant acquired lease in between for [{}, is: {}, 
eventTimestamp: {}] - num rows updated: ",
+        flowAction, isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
     return new LeasedToAnotherStatus(flowAction, 
selectInfoResult.getEventTimeMillis(),
         selectInfoResult.getLeaseAcquisitionTimeMillis().get() + 
selectInfoResult.getDbLinger()
@@ -469,10 +526,10 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     statement.setString(++i, flowAction.getFlowActionType().toString());
     // Values that may be needed depending on the insert statement
     if (needEventTimeCheck) {
-      statement.setTimestamp(++i, originalEventTimestamp);
+      statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL);
     }
     if (needLeaseAcquisitionTimeCheck) {
-      statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp);
+      statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL);
     }
   }
 
@@ -489,8 +546,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           updateStatement.setString(++i, flowGroup);
           updateStatement.setString(++i, flowName);
           updateStatement.setString(++i, flowActionType.toString());
-          updateStatement.setTimestamp(++i, new 
Timestamp(status.getEventTimestamp()));
-          updateStatement.setTimestamp(++i, new 
Timestamp(status.getLeaseAcquisitionTimestamp()));
+          updateStatement.setTimestamp(++i, new 
Timestamp(status.getEventTimestamp()), UTC_CAL);
+          updateStatement.setTimestamp(++i, new 
Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL);
           int numRowsUpdated = updateStatement.executeUpdate();
           if (numRowsUpdated == 0) {
             log.info("Multi-active lease arbiter lease attempt: [{}, 
eventTimestamp: {}] - FAILED to complete because "
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 5a3b9da15..8f1fdf30c 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -35,8 +35,8 @@ import static 
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
 
 @Slf4j
 public class MysqlMultiActiveLeaseArbiterTest {
-  private static final int EPSILON = 30000;
-  private static final int LINGER = 80000;
+  private static final int EPSILON = 10000;
+  private static final int LINGER = 50000;
   private static final String USER = "testUser";
   private static final String PASSWORD = "testPassword";
   private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
@@ -49,6 +49,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static DagActionStore.DagAction resumeDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.RESUME);
   private static final long eventTimeMillis = System.currentTimeMillis();
+  private static final Timestamp dummyTimestamp = new Timestamp(99999);
   private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
   private String formattedAcquireLeaseIfMatchingAllStatement =
       
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
TABLE);
@@ -82,7 +83,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   public void testAcquireLeaseSingleParticipant() throws Exception {
     // Tests CASE 1 of acquire lease for a flow action event not present in DB
     MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(firstLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
@@ -95,7 +96,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     DagActionStore.DagAction killDagAction = new
         DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);
     MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(killStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
@@ -106,19 +107,19 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Very little time should have passed if this test directly follows the 
one above so this call will be considered
     // the same as the previous event
     MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(secondLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
     MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
-    Assert.assertTrue(secondLeasedToAnotherStatus.getEventTimeMillis() == 
firstObtainedStatus.getEventTimestamp());
-    
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() 
>= LINGER);
+    Assert.assertEquals(firstObtainedStatus.getEventTimestamp(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
+    
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() 
> 0);
 
     // Tests CASE 3 of trying to acquire a lease for a distinct flow action 
event, while the previous event's lease is
     // valid
     // Allow enough time to pass for this trigger to be considered distinct, 
but not enough time so the lease expires
     Thread.sleep(EPSILON * 3/2);
     MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(thirdLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
     MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
         (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -128,7 +129,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // Tests CASE 4 of lease out of date
     Thread.sleep(LINGER);
     MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(fourthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
@@ -141,14 +142,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
     Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimestamp() < EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(fifthLaunchStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
     Thread.sleep(EPSILON * 3/2);
     MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(sixthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
     MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
         (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
@@ -166,9 +167,9 @@ public class MysqlMultiActiveLeaseArbiterTest {
   @Test (dependsOnMethods = "testAcquireLeaseSingleParticipant")
   public void testAcquireLeaseIfNewRow() throws IOException {
     // Inserting the first time should update 1 row
-    
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 1);
+    
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 1);
     // Inserting the second time should not update any rows
-    
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 0);
+    
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 0);
   }
 
     /*
@@ -180,22 +181,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
   @Test (dependsOnMethods = "testAcquireLeaseIfNewRow")
   public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() 
throws IOException {
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
 
     // The following insert will fail since the eventTimestamp does not match
-    int numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+    int numRowsUpdated = 
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
         formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
-        new Timestamp(99999), new 
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+        dummyTimestamp, new 
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
     Assert.assertEquals(numRowsUpdated, 0);
 
     // The following insert will fail since the leaseAcquisitionTimestamp does 
not match
-    numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+    numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
         formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
-        new Timestamp(selectInfoResult.getEventTimeMillis()), new 
Timestamp(99999));
+        new Timestamp(selectInfoResult.getEventTimeMillis()), dummyTimestamp);
     Assert.assertEquals(numRowsUpdated, 0);
 
     // This insert should work since the values match all the columns
-    numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+    numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
         formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
         new Timestamp(selectInfoResult.getEventTimeMillis()),
         new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
@@ -213,26 +214,92 @@ public class MysqlMultiActiveLeaseArbiterTest {
       throws IOException, InterruptedException, SQLException {
     // Mark the resume action lease from above as completed by fabricating a 
LeaseObtainedStatus
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
-        this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
-    boolean markedSuccess = 
this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
         resumeDagAction, selectInfoResult.getEventTimeMillis(), 
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
     Assert.assertTrue(markedSuccess);
     // Ensure no NPE results from calling this after a lease has been 
completed and acquisition timestamp val is NULL
-    mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeDagAction, Optional.empty());
-
-    // Sleep enough time for event to be considered distinct
-    Thread.sleep(LINGER);
+    mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeDagAction,
+        Optional.empty(), false);
 
     // The following insert will fail since eventTimestamp does not match the 
expected
-    int numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+    int numRowsUpdated = 
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
         formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
-        new Timestamp(99999), null);
+        dummyTimestamp, null);
     Assert.assertEquals(numRowsUpdated, 0);
 
     // This insert does match since we utilize the right eventTimestamp
-    numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+    numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
         formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
         new Timestamp(selectInfoResult.getEventTimeMillis()), null);
     Assert.assertEquals(numRowsUpdated, 1);
   }
+
+  /*
+  Tests calling `tryAcquireLease` for an older reminder event which should be 
immediately returned as `NoLongerLeasing`
+   */
+  @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFinishedLeasingStatement")
+  public void testOlderReminderEventAcquireLease() throws IOException {
+    // Read database to obtain existing db eventTimeMillis and use it to 
construct an older event
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
+    LeaseAttemptStatus attemptStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
olderEventTimestamp, true);
+    Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+  }
+
+  /*
+  Tests calling `tryAcquireLease` for a reminder event for which a valid lease 
exists in the database. We don't expect
+  this case to occur because the reminderEvent should be triggered after the 
lease expires, but ensure it's handled
+  correctly anyway.
+   */
+  @Test (dependsOnMethods = "testOlderReminderEventAcquireLease")
+  public void testReminderEventAcquireLeaseOnValidLease() throws IOException {
+    // Read database to obtain existing db eventTimeMillis and re-use it for 
the reminder event time
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    LeaseAttemptStatus attemptStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true);
+    Assert.assertTrue(attemptStatus instanceof LeasedToAnotherStatus);
+    LeasedToAnotherStatus leasedToAnotherStatus = (LeasedToAnotherStatus) 
attemptStatus;
+    Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(), 
selectInfoResult.getEventTimeMillis());
+  }
+
+  /*
+  Tests calling `tryAcquireLease` for a reminder event whose lease has expired 
in the database and should successfully
+  acquire a new lease
+   */
+  @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnValidLease")
+  public void testReminderEventAcquireLeaseOnInvalidLease() throws 
IOException, InterruptedException {
+    // Read database to obtain existing db eventTimeMillis and wait enough 
time for the lease to expire
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+        mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    Thread.sleep(LINGER);
+    LeaseAttemptStatus attemptStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true);
+    Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
+    LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
+    Assert.assertTrue(obtainedStatus.getEventTimestamp() > 
selectInfoResult.getEventTimeMillis());
+    Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() > 
selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
+  }
+
+   /*
+  Tests calling `tryAcquireLease` for a reminder event whose lease has 
completed in the database and should return
+  `NoLongerLeasing` status
+   */
+   @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
+   public void testReminderEventAcquireLeaseOnCompletedLease() throws 
IOException {
+     // Mark the resume action lease from above as completed by fabricating a 
LeaseObtainedStatus
+     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+         mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+     boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+         resumeDagAction, selectInfoResult.getEventTimeMillis(), 
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+     Assert.assertTrue(markedSuccess);
+
+     // Now have a reminder event check-in on the completed lease
+     LeaseAttemptStatus attemptStatus =
+         mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, 
selectInfoResult.getEventTimeMillis(), true);
+     Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 1849619f0..4f4bdf9fd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -104,12 +104,14 @@ public class FlowTriggerHandler {
    * @param jobProps
    * @param flowAction
    * @param eventTimeMillis
+   * @param isReminderEvent
    * @throws IOException
    */
-  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
-      throws IOException {
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis,
+      boolean isReminderEvent) throws IOException {
     if (multiActiveLeaseArbiter.isPresent()) {
-      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(
+          flowAction, eventTimeMillis, isReminderEvent);
       if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
         MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
         this.leaseObtainedCount.inc();
@@ -278,6 +280,8 @@ public class FlowTriggerHandler {
     // excess flows to be triggered by the reminder functionality.
     
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
         String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
+    // Use this boolean to indicate whether this is a reminder event
+    prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, 
String.valueOf(false));
     // Update job data map and reset it in jobDetail
     jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
     return jobDataMap;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index c0975aa97..7afd8bba4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -220,7 +220,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   }
 
-  public void orchestrate(Spec spec, Properties jobProps, long 
triggerTimestampMillis) throws Exception {
+  public void orchestrate(Spec spec, Properties jobProps, long 
triggerTimestampMillis, boolean isReminderEvent)
+      throws Exception {
     // Add below waiting because TopologyCatalog and FlowCatalog service can 
be launched at the same time
     this.topologyCatalog.get().getInitComplete().await();
 
@@ -264,9 +265,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
         DagActionStore.DagAction flowAction =
             new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
-        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis);
-        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, triggerEventTimestamp: {}]", flowAction,
-            triggerTimestampMillis);
+        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent);
+        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
+            flowAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
         Dag<JobExecutionPlan> jobExecutionPlanDag = 
jobExecutionPlanDagOptional.get();
         if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index daa96af2f..45053a93d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -503,7 +503,9 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       String triggerTimestampMillis = jobProps.getProperty(
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
-      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis));
+      boolean isReminderEvent =
+          
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
 "false"));
+      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis), isReminderEvent);
     } catch (Exception e) {
       String exceptionPrefix = "Failed to run Spec: " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
       log.warn(exceptionPrefix + " because", e);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 2d3f4b3f9..444f2dc8c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -341,13 +341,13 @@ public class OrchestratorTest {
     flowProps.put("gobblin.flow.destinationIdentifier", "destination");
     flowProps.put("flow.allowConcurrentExecution", false);
     FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
+    this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
     String metricName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", 
"flow0", ServiceMetricNames.COMPILED);
     
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
 
     flowProps.setProperty("job.schedule", "0/2 * * * * ?");
     FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
+    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);
     
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
   }
 }
\ No newline at end of file

Reply via email to