This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ca48bcd0a [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter 
with Single Participant (#3715)
ca48bcd0a is described below

commit ca48bcd0aee7bb4ecd7277b25944bff9f4fc378f
Author: umustafi <[email protected]>
AuthorDate: Tue Jul 18 14:03:35 2023 -0700

    [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single 
Participant (#3715)
    
    * Unit tests and corresponding fixes for 6 lease acquisition cases
    
    * remove logs
    
    * address review comments with small changes
    
    * clean up naming, logs, java doc
    
    * close resultSets
    
    * check if test passes on github
    
    * fix unit test
    
    * insert or update constants into table correctly
    
    * rename prop
    
    * address comments
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +-
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 326 ++++++++++++++-------
 .../api/MysqlMultiActiveLeaseArbiterTest.java      | 146 +++++++++
 .../modules/core/GobblinServiceGuiceModule.java    |   2 +-
 .../modules/orchestration/FlowTriggerHandler.java  |  75 +++--
 5 files changed, 413 insertions(+), 138 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 2d60fd5c8..50bd2a03d 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -99,7 +99,7 @@ public class ConfigurationKeys {
   public static final String MYSQL_LEASE_ARBITER_PREFIX = 
"MysqlMultiActiveLeaseArbiter";
   public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
   public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "gobblin_multi_active_scheduler_constants_store";
-  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
   public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
   public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = 
"eventToRevisitTimestampMillis";
   public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = 
"triggerEventTimestampMillis";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 8a40c71b2..9318d8b56 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime.api;
 
+import com.google.common.base.Optional;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -29,6 +30,7 @@ import com.typesafe.config.Config;
 import com.zaxxer.hikari.HikariDataSource;
 
 import javax.sql.DataSource;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -54,6 +56,11 @@ import org.apache.gobblin.util.ConfigUtils;
  *            than epsilon and encapsulate executor communication latency 
including retry attempts
  *
  * The `event_timestamp` is the time of the flow_action event request.
+ * --- Note ---
+ * We only use the participant's local event_timestamp internally to identify 
the particular flow_action event, but
+ * after interacting with the database utilize the CURRENT_TIMESTAMP of the 
database to insert or keep
+ * track of our event. This is to avoid any discrepancies due to clock drift 
between participants as well as
+ * variation in local time and database time for future comparisons.
  * ---Event consolidation---
  * Note that for the sake of simplification, we only allow one event 
associated with a particular flow's flow_action
  * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH, 
KILL, & RESUME for flow FOO at once) during
@@ -80,41 +87,48 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   private final String constantsTableName;
   private final int epsilon;
   private final int linger;
+  private String thisTableGetInfoStatement;
+  private String thisTableSelectAfterInsertStatement;
 
   // TODO: define retention on this table
-  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %S ("
+  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %s ("
       + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
       + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + 
"flow_execution_id varchar("
       + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, 
flow_action varchar(100) NOT NULL, "
       + "event_timestamp TIMESTAMP, "
-      + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+      + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, 
"
       + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
   private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
-      + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO 
%s (epsilon, linger) VALUES (?,?)";
+      + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY 
(primary_key))";
+  // Only insert epsilon and linger values from config if this table does not 
contain a pre-existing values already.
+  private static final String UPSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO 
%s (primary_key, epsilon, linger) "
+      + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), 
linger=VALUES(linger)";
   protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_execution_id=?"
       + " AND flow_action=?";
   protected static final String WHERE_CLAUSE_TO_MATCH_ROW = 
WHERE_CLAUSE_TO_MATCH_KEY
       + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
-  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
ROW_COUNT() AS rows_inserted_count, "
-      + "lease_acquisition_timestamp, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
+    + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
   // Does a cross join between the two tables to have epsilon and linger 
values available. Returns the following values:
-  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
event_timestamp in table is within
-  // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired, 
3 if column is NULL or no longer leasing)
+  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
current time in db is within epsilon of
+  // event_timestamp), leaseValidityStatus (1 if lease has not expired, 2 if 
expired, 3 if column is NULL or no longer
+  // leasing)
   protected static final String GET_EVENT_INFO_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
-      + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
-      + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then 
1"
-      + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then 
2"
-      + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+      + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000 
<= epsilon as is_within_epsilon, CASE "
+      + "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
+      + "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
+      + "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM 
%s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
   // Insert or update row to acquire lease if values have not changed since 
the previous read
   // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
-  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
-      + "(flow_group, flow_name, flow_execution_id, flow_action, 
event_timestamp) VALUES (?, ?, ?, ?, ?)";
+  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, 
"
+      + "flow_name, flow_execution_id, flow_action, event_timestamp, 
lease_acquisition_timestamp) "
+      + "VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
   protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
-      + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
-      + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
+      + "SET event_timestamp=CURRENT_TIMESTAMP, 
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+      + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND 
lease_acquisition_timestamp is NULL";
   protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
-      + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
-      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+      + "SET event_timestamp=CURRENT_TIMESTAMP, 
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+      + WHERE_CLAUSE_TO_MATCH_ROW;
   // Complete lease acquisition if values have not changed since lease was 
acquired
   protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = 
"UPDATE %s SET "
       + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
@@ -136,81 +150,107 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
     this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+    this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, 
this.leaseArbiterTableName,
+        this.constantsTableName);
+    this.thisTableSelectAfterInsertStatement = 
String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
+        this.constantsTableName);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    String createArbiterStatement = String.format(
+        CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
-            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+        PreparedStatement createStatement = 
connection.prepareStatement(createArbiterStatement)) {
       createStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
-    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
-        createStatement -> {
+    initializeConstantsTable();
+
+    log.info("MysqlMultiActiveLeaseArbiter initialized");
+  }
+
+  // Initialize Constants table if needed and insert row into it if one does 
not exist
+  private void initializeConstantsTable() throws IOException {
+    String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+
+    String insertConstantsStatement = 
String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+    withPreparedStatement(insertConstantsStatement, insertStatement -> {
       int i = 0;
-      createStatement.setInt(++i, epsilon);
-      createStatement.setInt(++i, linger);
-      return createStatement.executeUpdate();}, true);
+      insertStatement.setInt(++i, epsilon);
+      insertStatement.setInt(++i, linger);
+      return insertStatement.executeUpdate();
+    }, true);
   }
 
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
     // Check table for an existing entry for this flow action and event time
-    ResultSet resultSet = withPreparedStatement(
-        String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, 
this.constantsTableName),
+    Optional<GetEventInfoResult> getResult = 
withPreparedStatement(thisTableGetInfoStatement,
         getInfoStatement -> {
           int i = 0;
-          getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
           getInfoStatement.setString(++i, flowAction.getFlowGroup());
           getInfoStatement.setString(++i, flowAction.getFlowName());
           getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
           getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
-          return getInfoStatement.executeQuery();
+          ResultSet resultSet = getInfoStatement.executeQuery();
+          try {
+            if (!resultSet.next()) {
+              return Optional.absent();
+            }
+            return Optional.of(createGetInfoResult(resultSet));
+          } finally {
+            if (resultSet !=  null) {
+              resultSet.close();
+            }
+          }
         }, true);
 
-    String formattedSelectAfterInsertStatement =
-        String.format(SELECT_AFTER_INSERT_STATEMENT, 
this.leaseArbiterTableName, this.constantsTableName);
     try {
-      // CASE 1: If no existing row for this flow action, then go ahead and 
insert
-      if (!resultSet.next()) {
+      if (!getResult.isPresent()) {
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no 
existing row for this flow action, then go"
+                + " ahead and insert", flowAction, eventTimeMillis);
         String formattedAcquireLeaseNewRowStatement =
             String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseNewRowStatement + "; " + 
formattedSelectAfterInsertStatement,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseNewRowStatement,
             insertStatement -> {
-              completeInsertPreparedStatement(insertStatement, flowAction, 
eventTimeMillis);
-              return insertStatement.executeQuery();
+              completeInsertPreparedStatement(insertStatement, flowAction);
+              return insertStatement.executeUpdate();
             }, true);
-       return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.absent());
       }
 
       // Extract values from result set
-      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
-      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
-      boolean isWithinEpsilon = resultSet.getBoolean("isWithinEpsilon");
-      int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
-      int dbLinger = resultSet.getInt("linger");
+      Timestamp dbEventTimestamp = getResult.get().getDbEventTimestamp();
+      Timestamp dbLeaseAcquisitionTimestamp = 
getResult.get().getDbLeaseAcquisitionTimestamp();
+      boolean isWithinEpsilon = getResult.get().isWithinEpsilon();
+      int leaseValidityStatus = getResult.get().getLeaseValidityStatus();
+      // Used to calculate minimum amount of time until a participant should 
check whether a lease expired
+      int dbLinger = getResult.get().getDbLinger();
+      Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
+
+      log.info("Multi-active arbiter replacing local trigger event timestamp 
[{}, triggerEventTimestamp: {}] with "
+          + "database eventTimestamp {}", flowAction, eventTimeMillis, 
dbCurrentTimestamp.getTime());
 
-      // CASE 2: If our event timestamp is older than the last event in db, 
then skip this trigger
-      if (eventTimeMillis < dbEventTimestamp.getTime()) {
-        return new NoLongerLeasingStatus();
-      }
       // Lease is valid
       if (leaseValidityStatus == 1) {
-        // CASE 3: Same event, lease is valid
         if (isWithinEpsilon) {
+          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 2: 
Same event, lease is valid", flowAction,
+              dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
           return new LeasedToAnotherStatus(flowAction, 
dbEventTimestamp.getTime(),
-              dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
System.currentTimeMillis());
+              dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
dbCurrentTimestamp.getTime());
         }
-        // CASE 4: Distinct event, lease is valid
-        // Utilize db timestamp for wait time, but be reminded of own event 
timestamp
-        return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
-            dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
System.currentTimeMillis());
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 3: 
Distinct event, lease is valid", flowAction,
+            dbCurrentTimestamp.getTime());
+        // Utilize db lease acquisition timestamp for wait time
+        return new LeasedToAnotherStatus(flowAction, 
dbCurrentTimestamp.getTime(),
+            dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
dbCurrentTimestamp.getTime());
       }
-      // CASE 5: Lease is out of date (regardless of whether same or distinct 
event)
       else if (leaseValidityStatus == 2) {
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4: 
Lease is out of date (regardless of whether "
+            + "same or distinct event)", flowAction, 
dbCurrentTimestamp.getTime());
         if (isWithinEpsilon) {
           log.warn("Lease should not be out of date for the same trigger event 
since epsilon << linger for flowAction"
                   + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp 
{}, linger {}", flowAction,
@@ -219,84 +259,143 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfMatchingAllStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
       } // No longer leasing this event
-        // CASE 6: Same event, no longer leasing event in db: terminate
         if (isWithinEpsilon) {
+          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: 
Same event, no longer leasing event in db: "
+              + "terminate", flowAction, dbCurrentTimestamp.getTime());
           return new NoLongerLeasingStatus();
         }
-        // CASE 7: Distinct event, no longer leasing event in db
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: 
Distinct event, no longer leasing event in "
+            + "db", flowAction, dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfFinishedStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   false, dbEventTimestamp, null);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
+  protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
+    try {
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+      int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+      int dbLinger = resultSet.getInt("linger");
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+      return new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+          dbLinger, dbCurrentTimestamp);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (resultSet != null) {
+        try {
+          resultSet.close();
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) 
throws IOException {
+      try {
+        if (!resultSet.next()) {
+          throw new IOException("Expected num rows and 
lease_acquisition_timestamp returned from query but received nothing, so "
+              + "providing empty result to lease evaluation code");
+        }
+        long eventTimeMillis = resultSet.getTimestamp(1).getTime();
+        long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
+        int dbLinger = resultSet.getInt(3);
+        return new SelectInfoResult(eventTimeMillis, 
leaseAcquisitionTimeMillis, dbLinger);
+      } catch (SQLException e) {
+        throw new IOException(e);
+      } finally {
+        if (resultSet != null) {
+          try {
+            resultSet.close();
+          } catch (SQLException e) {
+            throw new IOException(e);
+          }
+        }
+      }
+  }
+
   /**
-   * Attempt lease by insert or update following a read based on the condition 
the state of the table has not changed
-   * since the read. Parse the result to return the corresponding status based 
on successful insert/update or not.
-   * @param resultSet
-   * @param eventTimeMillis
-   * @return LeaseAttemptStatus
+   * Parse result of attempted insert/update to obtain a lease for a
+   * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} event by 
selecting values corresponding to that
+   * event from the table to return the corresponding status based on 
successful insert/update or not.
    * @throws SQLException
    * @throws IOException
    */
-  protected LeaseAttemptStatus 
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet,
-      DagActionStore.DagAction flowAction, long eventTimeMillis)
+  protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int 
numRowsUpdated,
+      DagActionStore.DagAction flowAction, Optional<Timestamp> 
dbCurrentTimestamp)
       throws SQLException, IOException {
-    if (!resultSet.next()) {
-      throw new IOException("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");
-    }
-    int numRowsUpdated = resultSet.getInt(1);
-    long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
-    int dbLinger = resultSet.getInt(3);
+    // Fetch values in row after attempted insert
+    SelectInfoResult selectInfoResult = 
withPreparedStatement(thisTableSelectAfterInsertStatement,
+        selectStatement -> {
+          completeWhereClauseMatchingKeyPreparedStatement(selectStatement, 
flowAction);
+          ResultSet resultSet = selectStatement.executeQuery();
+          try {
+            return createSelectInfoResult(resultSet);
+          } finally {
+            if (resultSet !=  null) {
+              resultSet.close();
+            }
+          }
+        }, true);
     if (numRowsUpdated == 1) {
-      return new LeaseObtainedStatus(flowAction, eventTimeMillis, 
leaseAcquisitionTimeMillis);
+      log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", 
flowAction,
+          selectInfoResult.eventTimeMillis);
+      return new LeaseObtainedStatus(flowAction, 
selectInfoResult.eventTimeMillis,
+          selectInfoResult.getLeaseAcquisitionTimeMillis());
     }
     // Another participant acquired lease in between
-    return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
-        leaseAcquisitionTimeMillis + dbLinger - System.currentTimeMillis());
+    return new LeasedToAnotherStatus(flowAction, 
selectInfoResult.getEventTimeMillis(),
+        selectInfoResult.getLeaseAcquisitionTimeMillis() + 
selectInfoResult.getDbLinger()
+            - (dbCurrentTimestamp.isPresent() ? 
dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
   }
 
   /**
    * Complete the INSERT statement for a new flow action lease where the flow 
action is not present in the table
    * @param statement
    * @param flowAction
-   * @param eventTimeMillis
    * @throws SQLException
    */
-  protected void completeInsertPreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction,
-      long eventTimeMillis) throws SQLException {
+  protected void completeInsertPreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction)
+      throws SQLException {
     int i = 0;
     // Values to set in new row
     statement.setString(++i, flowAction.getFlowGroup());
     statement.setString(++i, flowAction.getFlowName());
     statement.setString(++i, flowAction.getFlowExecutionId());
     statement.setString(++i, flowAction.getFlowActionType().toString());
-    statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
-    // Values to check if existing row matches previous read
-    statement.setString(++i, flowAction.getFlowGroup());
-    statement.setString(++i, flowAction.getFlowName());
-    statement.setString(++i, flowAction.getFlowExecutionId());
-    statement.setString(++i, flowAction.getFlowActionType().toString());
-    // Values to select for return
+  }
+
+  /**
+   * Complete the WHERE clause to match a flow action in a select statement
+   * @param statement
+   * @param flowAction
+   * @throws SQLException
+   */
+  protected void 
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction)
+    throws SQLException {
+    int i = 0;
     statement.setString(++i, flowAction.getFlowGroup());
     statement.setString(++i, flowAction.getFlowName());
     statement.setString(++i, flowAction.getFlowExecutionId());
@@ -308,7 +407,6 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * updated.
    * @param statement
    * @param flowAction
-   * @param eventTimeMillis
    * @param needEventTimeCheck true if need to compare 
`originalEventTimestamp` with db event_timestamp
    * @param needLeaseAcquisitionTimeCheck true if need to compare 
`originalLeaseAcquisitionTimestamp` with db one
    * @param originalEventTimestamp value to compare to db one, null if not 
needed
@@ -316,11 +414,9 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @throws SQLException
    */
   protected void completeUpdatePreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction,
-      long eventTimeMillis, boolean needEventTimeCheck, boolean 
needLeaseAcquisitionTimeCheck,
+      boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
       Timestamp originalEventTimestamp, Timestamp 
originalLeaseAcquisitionTimestamp) throws SQLException {
     int i = 0;
-    // Value to update
-    statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
     // Values to check if existing row matches previous read
     statement.setString(++i, flowAction.getFlowGroup());
     statement.setString(++i, flowAction.getFlowName());
@@ -333,11 +429,6 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     if (needLeaseAcquisitionTimeCheck) {
       statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp);
     }
-    // Values to select for return
-    statement.setString(++i, flowAction.getFlowGroup());
-    statement.setString(++i, flowAction.getFlowName());
-    statement.setString(++i, flowAction.getFlowExecutionId());
-    statement.setString(++i, flowAction.getFlowActionType().toString());
   }
 
   @Override
@@ -375,17 +466,44 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   }
 
   /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
-  protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws 
IOException {
+  protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+      throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(sql)) {
       T result = f.apply(statement);
       if (shouldCommit) {
         connection.commit();
       }
+      statement.close();
       return result;
     } catch (SQLException e) {
-      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} Exception is {}", 
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} "
+          + "Exception is {}", ((HikariDataSource) 
this.dataSource).getConnectionTestQuery(), e);
       throw new IOException(e);
     }
   }
+
+
+  /**
+   * DTO for arbiter's current lease state for a FlowActionEvent
+  */
+  @Data
+  static class GetEventInfoResult {
+    private final Timestamp dbEventTimestamp;
+    private final Timestamp dbLeaseAcquisitionTimestamp;
+    private final boolean withinEpsilon;
+    private final int leaseValidityStatus;
+    private final int dbLinger;
+    private final Timestamp dbCurrentTimestamp;
+  }
+
+  /**
+   DTO for result of SELECT query used to determine status of lease 
acquisition attempt
+  */
+  @Data
+  static class SelectInfoResult {
+    private final long eventTimeMillis;
+    private final long leaseAcquisitionTimeMillis;
+    private final int dbLinger;
+  }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
new file mode 100644
index 000000000..3ede1ce83
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class MysqlMultiActiveLeaseArbiterTest {
+  private static final int EPSILON = 30000;
+  private static final int LINGER = 80000;
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
+  private static final String flowGroup = "testFlowGroup";
+  private static final String flowName = "testFlowName";
+  private static final String flowExecutionId = "12345677";
+  private static DagActionStore.DagAction launchDagAction =
+      new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
+
+  private static final long eventTimeMillis = System.currentTimeMillis();
+  private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
+
+  // The setup functionality verifies that the initialization of the tables is 
done correctly and verifies any SQL
+  // syntax errors.
+  @BeforeClass
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, EPSILON)
+        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, LINGER)
+        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.mysqlMultiActiveLeaseArbiter = new 
MysqlMultiActiveLeaseArbiter(config);
+  }
+
+  /*
+     Tests all cases of trying to acquire a lease (CASES 1-6 detailed below) 
for a flow action event with one
+     participant involved.
+  */
+  // TODO: refactor this to break it into separate test cases as much is 
possible
+  @Test
+  public void testAcquireLeaseSingleParticipant() throws Exception {
+    // Tests CASE 1 of acquire lease for a flow action event not present in DB
+    MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+    Assert.assertTrue(firstLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+    MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
+        (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+    Assert.assertTrue(firstObtainedStatus.getEventTimestamp() <=
+        firstObtainedStatus.getLeaseAcquisitionTimestamp());
+    Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
+        new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH)));
+
+    // Verify that different DagAction types for the same flow can have leases 
at the same time
+    DagActionStore.DagAction killDagAction = new
+        DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);
+    MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, 
eventTimeMillis);
+    Assert.assertTrue(killStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+    MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
+        (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
+    Assert.assertTrue(
+        killObtainedStatus.getLeaseAcquisitionTimestamp() >= 
killObtainedStatus.getEventTimestamp());
+
+    // Tests CASE 2 of acquire lease for a flow action event that already has 
a valid lease for the same event in db
+    // Very little time should have passed if this test directly follows the 
one above so this call will be considered
+    // the same as the previous event
+    MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+    Assert.assertTrue(secondLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
+    MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+        (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+    Assert.assertTrue(secondLeasedToAnotherStatus.getEventTimeMillis() == 
firstObtainedStatus.getEventTimestamp());
+    
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() 
>= LINGER);
+
+    // Tests CASE 3 of trying to acquire a lease for a distinct flow action 
event, while the previous event's lease is
+    // valid
+    // Allow enough time to pass for this trigger to be considered distinct, 
but not enough time so the lease expires
+    Thread.sleep(EPSILON * 3/2);
+    MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+    Assert.assertTrue(thirdLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
+    MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
+        (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
+    Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > 
firstObtainedStatus.getEventTimestamp());
+    
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() < 
LINGER);
+
+    // Tests CASE 4 of lease out of date
+    Thread.sleep(LINGER);
+    MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+    Assert.assertTrue(fourthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+    MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
+        (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
+    Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() > 
eventTimeMillis + LINGER);
+    Assert.assertTrue(fourthObtainedStatus.getEventTimestamp()
+        <= fourthObtainedStatus.getLeaseAcquisitionTimestamp());
+
+    // Tests CASE 5 of no longer leasing the same event in DB
+    // done immediately after previous lease obtainment so should be marked as 
the same event
+    
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
+    Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimestamp() < EPSILON);
+    MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+    Assert.assertTrue(fifthLaunchStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
+
+    // Tests CASE 6 of no longer leasing a distinct event in DB
+    // Wait so this event is considered distinct and a new lease will be 
acquired
+    Thread.sleep(EPSILON * 3/2);
+    MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+    Assert.assertTrue(sixthLaunchStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+    MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
+        (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
+    Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
+        <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index c0f140a9f..1423e2d8c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -168,7 +168,7 @@ public class GobblinServiceGuiceModule implements Module {
     OptionalBinder.newOptionalBinder(binder, MultiActiveLeaseArbiter.class);
     OptionalBinder.newOptionalBinder(binder, FlowTriggerHandler.class);
     if (serviceConfig.isMultiActiveSchedulerEnabled()) {
-      binder.bind(MysqlMultiActiveLeaseArbiter.class);
+      
binder.bind(MultiActiveLeaseArbiter.class).to(MysqlMultiActiveLeaseArbiter.class);
       binder.bind(FlowTriggerHandler.class);
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 42ab0af96..ec63276c4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.google.common.base.Optional;
 import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -62,15 +63,15 @@ import org.apache.gobblin.util.ConfigUtils;
 public class FlowTriggerHandler {
   private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
-  protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  protected Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter;
   protected SchedulerService schedulerService;
-  protected DagActionStore dagActionStore;
+  protected Optional<DagActionStore> dagActionStore;
   private MetricContext metricContext;
   private ContextAwareMeter numFlowsSubmitted;
 
   @Inject
-  public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
-      SchedulerService schedulerService, DagActionStore dagActionStore) {
+  public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> 
leaseDeterminationStore,
+      SchedulerService schedulerService, Optional<DagActionStore> 
dagActionStore) {
     this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
     this.multiActiveLeaseArbiter = leaseDeterminationStore;
@@ -91,41 +92,51 @@ public class FlowTriggerHandler {
    */
   public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
-        multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
-    // TODO: add a log event or metric for each of these cases
-    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
-      MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
-      if (persistFlowAction(leaseObtainedStatus)) {
+    if (multiActiveLeaseArbiter.isPresent()) {
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+      // TODO: add a log event or metric for each of these cases
+      if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+        if (persistFlowAction(leaseObtainedStatus)) {
+          return;
+        }
+        // If persisting the flow action failed, then we set another trigger 
for this event to occur immediately to
+        // re-attempt handling the event
+        scheduleReminderForEvent(jobProps,
+            new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction, 
leaseObtainedStatus.getEventTimestamp(), 0L),
+            eventTimeMillis);
+        return;
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+        scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
+            eventTimeMillis);
+        return;
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
         return;
       }
-      // If persisting the flow action failed, then we set another trigger for 
this event to occur immediately to
-      // re-attempt handling the event
-      scheduleReminderForEvent(jobProps, new 
MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
-          leaseObtainedStatus.getEventTimestamp(), 0L), eventTimeMillis);
-      return;
-    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
-      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
-          eventTimeMillis);
-      return;
-    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
-      return;
+      throw new RuntimeException(String.format("Received type of 
leaseAttemptStatus: %s not handled by this method",
+          leaseAttemptStatus.getClass().getName()));
+    } else {
+      throw new RuntimeException(String.format("Multi-active scheduler is not 
enabled so trigger event should not be "
+          + "handled with this method."));
     }
-    throw new RuntimeException(String.format("Received type of 
leaseAttemptStatus: %s not handled by this method",
-            leaseAttemptStatus.getClass().getName()));
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
   private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus leaseStatus) {
-    try {
-      DagActionStore.DagAction flowAction = leaseStatus.getFlowAction();
-      this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-      this.numFlowsSubmitted.mark();
-      return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    if (this.dagActionStore.isPresent() && 
this.multiActiveLeaseArbiter.isPresent()) {
+      try {
+        DagActionStore.DagAction flowAction = leaseStatus.getFlowAction();
+        this.dagActionStore.get().addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(), flowAction.getFlowExecutionId(), 
flowAction.getFlowActionType());
+        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+        this.numFlowsSubmitted.mark();
+        return 
this.multiActiveLeaseArbiter.get().recordLeaseSuccess(leaseStatus);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      throw new RuntimeException("DagActionStore is " + 
(this.dagActionStore.isPresent() ? "" : "NOT") + " present. "
+          + "Multi-Active scheduler is " + 
(this.multiActiveLeaseArbiter.isPresent() ? "" : "NOT") + " present. Both "
+          + "should be enabled if this method is called.");
     }
   }
 

Reply via email to