This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 01c433c6d [GOBBLIN-1859] Multi-active Unit Test for Multi-Participant 
state  (#3721)
01c433c6d is described below

commit 01c433c6dffcfd8bf699313542f2e386b87d4627
Author: umustafi <[email protected]>
AuthorDate: Thu Aug 10 15:04:53 2023 -0700

    [GOBBLIN-1859] Multi-active Unit Test for Multi-Participant state  (#3721)
    
    * new unit tests passing
    
    * clean up
    
    * change upsert to insert
    
    * catch sql exception & update test
    
    * Refactor to create better api for testing
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 182 +++++++++++++--------
 .../api/MysqlMultiActiveLeaseArbiterTest.java      |  95 ++++++++++-
 2 files changed, 202 insertions(+), 75 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 2cdcf71ce..964a29851 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -18,21 +18,19 @@
 package org.apache.gobblin.runtime.api;
 
 import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.Timestamp;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import com.zaxxer.hikari.HikariDataSource;
-
 import javax.sql.DataSource;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
@@ -89,6 +87,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   private final int linger;
   private String thisTableGetInfoStatement;
   private String thisTableSelectAfterInsertStatement;
+  private String thisTableAcquireLeaseIfMatchingAllStatement;
+  private String thisTableAcquireLeaseIfFinishedStatement;
 
   // TODO: define retention on this table
   private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %s ("
@@ -120,9 +120,9 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       + "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM 
%s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
   // Insert or update row to acquire lease if values have not changed since 
the previous read
   // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
-  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, 
"
-      + "flow_name, flow_execution_id, flow_action, event_timestamp, 
lease_acquisition_timestamp) "
-      + "VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
+  protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT 
INTO %s (flow_group, flow_name, "
+      + "flow_execution_id, flow_action, event_timestamp, 
lease_acquisition_timestamp) VALUES(?, ?, ?, ?, "
+      + "CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
   protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
       + "SET event_timestamp=CURRENT_TIMESTAMP, 
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
       + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND 
lease_acquisition_timestamp is NULL";
@@ -154,6 +154,10 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         this.constantsTableName);
     this.thisTableSelectAfterInsertStatement = 
String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
         this.constantsTableName);
+    this.thisTableAcquireLeaseIfMatchingAllStatement =
+        
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
+    this.thisTableAcquireLeaseIfFinishedStatement =
+        
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
     String createArbiterStatement = String.format(
         CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
@@ -186,38 +190,14 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    // Check table for an existing entry for this flow action and event time
-    Optional<GetEventInfoResult> getResult = 
withPreparedStatement(thisTableGetInfoStatement,
-        getInfoStatement -> {
-          int i = 0;
-          getInfoStatement.setString(++i, flowAction.getFlowGroup());
-          getInfoStatement.setString(++i, flowAction.getFlowName());
-          getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
-          getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
-          ResultSet resultSet = getInfoStatement.executeQuery();
-          try {
-            if (!resultSet.next()) {
-              return Optional.<GetEventInfoResult>absent();
-            }
-            return Optional.of(createGetInfoResult(resultSet));
-          } finally {
-            if (resultSet !=  null) {
-              resultSet.close();
-            }
-          }
-        }, true);
+    // Query lease arbiter table about this flow action
+    Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);
 
     try {
       if (!getResult.isPresent()) {
         log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no 
existing row for this flow action, then go"
                 + " ahead and insert", flowAction, eventTimeMillis);
-        String formattedAcquireLeaseNewRowStatement =
-            String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
-        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseNewRowStatement,
-            insertStatement -> {
-              completeInsertPreparedStatement(insertStatement, flowAction);
-              return insertStatement.executeUpdate();
-            }, true);
+        int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.absent());
       }
 
@@ -257,14 +237,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
               dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
         }
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
-        String formattedAcquireLeaseIfMatchingAllStatement =
-            
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
-            insertStatement -> {
-              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
-                  true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return insertStatement.executeUpdate();
-            }, true);
+        int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement, 
flowAction,
+            true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
         return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
       } // No longer leasing this event
         if (isWithinEpsilon) {
@@ -275,20 +249,39 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: 
Distinct event, no longer leasing event in "
             + "db", flowAction, dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
-        String formattedAcquireLeaseIfFinishedStatement =
-            
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
-            insertStatement -> {
-              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
-                  false, dbEventTimestamp, null);
-              return insertStatement.executeUpdate();
-            }, true);
+        int numRowsUpdated = 
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
+            true, false, dbEventTimestamp, null);
         return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
+  /**
+   * Checks leaseArbiterTable for an existing entry for this flow action and 
event time
+   */
+  protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException {
+    return withPreparedStatement(thisTableGetInfoStatement,
+        getInfoStatement -> {
+          int i = 0;
+          getInfoStatement.setString(++i, flowAction.getFlowGroup());
+          getInfoStatement.setString(++i, flowAction.getFlowName());
+          getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
+          getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
+          ResultSet resultSet = getInfoStatement.executeQuery();
+          try {
+            if (!resultSet.next()) {
+              return Optional.absent();
+            }
+            return Optional.of(createGetInfoResult(resultSet));
+          } finally {
+            if (resultSet !=  null) {
+              resultSet.close();
+            }
+          }
+        }, true);
+  }
+
   protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
     try {
       // Extract values from result set
@@ -313,7 +306,63 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     }
   }
 
-  protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) 
throws IOException {
+  /**
+   * Called by participant to try to acquire lease for a flow action that does 
not have an attempt in progress or in
+   * near past for it.
+   * @return int corresponding to number of rows updated by INSERT statement 
to acquire lease
+   */
+  protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) 
throws IOException {
+    String formattedAcquireLeaseNewRowStatement =
+        String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
+    return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
+        insertStatement -> {
+          completeInsertPreparedStatement(insertStatement, flowAction);
+          try {
+            return insertStatement.executeUpdate();
+          } catch (SQLIntegrityConstraintViolationException e) {
+            if (!e.getMessage().contains("Duplicate entry")) {
+              throw e;
+            }
+            return 0;
+          }
+        }, true);
+  }
+
+  /**
+   * Called by participant to try to acquire lease for a flow action that has 
an existing, completed, or expired lease
+   * attempt for the flow action in the table.
+   * @return int corresponding to number of rows updated by INSERT statement 
to acquire lease
+   */
+  protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, 
DagActionStore.DagAction flowAction,
+      boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp 
dbEventTimestamp,
+      Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
+    return withPreparedStatement(acquireLeaseStatement,
+        insertStatement -> {
+          completeUpdatePreparedStatement(insertStatement, flowAction, 
needEventTimeCheck,
+              needLeaseAcquisition, dbEventTimestamp, 
dbLeaseAcquisitionTimestamp);
+          return insertStatement.executeUpdate();
+        }, true);
+  }
+
+  /**
+   * Checks leaseArbiter table for a row corresponding to this flow action to 
determine if the lease acquisition attempt
+   * was successful or not.
+   */
+  protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) 
throws IOException {
+    return withPreparedStatement(thisTableSelectAfterInsertStatement,
+        selectStatement -> {
+          completeWhereClauseMatchingKeyPreparedStatement(selectStatement, 
flowAction);
+          ResultSet resultSet = selectStatement.executeQuery();
+          try {
+            return createSelectInfoResult(resultSet);
+          } finally {
+            if (resultSet !=  null) {
+              resultSet.close();
+            }
+          }
+        }, true);
+  }
+  protected static SelectInfoResult createSelectInfoResult(ResultSet 
resultSet) throws IOException {
       try {
         if (!resultSet.next()) {
           throw new IOException("Expected num rows and 
lease_acquisition_timestamp returned from query but received nothing, so "
@@ -347,24 +396,15 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       DagActionStore.DagAction flowAction, Optional<Timestamp> 
dbCurrentTimestamp)
       throws SQLException, IOException {
     // Fetch values in row after attempted insert
-    SelectInfoResult selectInfoResult = 
withPreparedStatement(thisTableSelectAfterInsertStatement,
-        selectStatement -> {
-          completeWhereClauseMatchingKeyPreparedStatement(selectStatement, 
flowAction);
-          ResultSet resultSet = selectStatement.executeQuery();
-          try {
-            return createSelectInfoResult(resultSet);
-          } finally {
-            if (resultSet !=  null) {
-              resultSet.close();
-            }
-          }
-        }, true);
+    SelectInfoResult selectInfoResult = getRowInfo(flowAction);
     if (numRowsUpdated == 1) {
       log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", 
flowAction,
           selectInfoResult.eventTimeMillis);
       return new LeaseObtainedStatus(flowAction, 
selectInfoResult.eventTimeMillis,
           selectInfoResult.getLeaseAcquisitionTimeMillis());
     }
+    log.debug("Another participant acquired lease in between for [{}, 
eventTimestamp: {}] - num rows updated: ",
+        flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
     return new LeasedToAnotherStatus(flowAction, 
selectInfoResult.getEventTimeMillis(),
         selectInfoResult.getLeaseAcquisitionTimeMillis() + 
selectInfoResult.getDbLinger()
@@ -377,8 +417,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @param flowAction
    * @throws SQLException
    */
-  protected void completeInsertPreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction)
-      throws SQLException {
+  protected static void completeInsertPreparedStatement(PreparedStatement 
statement,
+      DagActionStore.DagAction flowAction) throws SQLException {
     int i = 0;
     // Values to set in new row
     statement.setString(++i, flowAction.getFlowGroup());
@@ -393,8 +433,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @param flowAction
    * @throws SQLException
    */
-  protected void 
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction)
-    throws SQLException {
+  protected static void 
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement,
+      DagActionStore.DagAction flowAction) throws SQLException {
     int i = 0;
     statement.setString(++i, flowAction.getFlowGroup());
     statement.setString(++i, flowAction.getFlowName());
@@ -413,8 +453,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * @param originalLeaseAcquisitionTimestamp value to compare to db one, null 
if not needed
    * @throws SQLException
    */
-  protected void completeUpdatePreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction,
-      boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
+  protected static void completeUpdatePreparedStatement(PreparedStatement 
statement,
+      DagActionStore.DagAction flowAction, boolean needEventTimeCheck, boolean 
needLeaseAcquisitionTimeCheck,
       Timestamp originalEventTimestamp, Timestamp 
originalLeaseAcquisitionTimestamp) throws SQLException {
     int i = 0;
     // Values to check if existing row matches previous read
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 3ede1ce83..1eb872586 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.runtime.api;
 
 import com.typesafe.config.Config;
+import java.io.IOException;
+import java.sql.Timestamp;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -27,6 +29,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
+
 @Slf4j
 public class MysqlMultiActiveLeaseArbiterTest {
   private static final int EPSILON = 30000;
@@ -37,11 +41,17 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String flowGroup = "testFlowGroup";
   private static final String flowName = "testFlowName";
   private static final String flowExecutionId = "12345677";
+  // The following are considered unique because they correspond to different 
flow action types
   private static DagActionStore.DagAction launchDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
-
+  private static DagActionStore.DagAction resumeDagAction =
+      new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.RESUME);
   private static final long eventTimeMillis = System.currentTimeMillis();
   private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
+  private String formattedAcquireLeaseIfMatchingAllStatement =
+      
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
TABLE);
+  private String formattedAcquireLeaseIfFinishedStatement =
+      String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
TABLE);
 
   // The setup functionality verifies that the initialization of the tables is 
done correctly and verifies any SQL
   // syntax errors.
@@ -50,12 +60,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
     ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
 
     Config config = ConfigBuilder.create()
-        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, EPSILON)
-        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, LINGER)
+        .addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, 
EPSILON)
+        .addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, 
LINGER)
         .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
         .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
         .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
-        .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        
.addPrimitive(ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
 TABLE)
         .build();
 
     this.mysqlMultiActiveLeaseArbiter = new 
MysqlMultiActiveLeaseArbiter(config);
@@ -143,4 +153,81 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
+
+  /*
+     Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no 
row matches the primary key in the table.
+     If such a row does exist, the method should disregard the resulting SQL 
error and return 0 rows updated, indicating
+     the lease was not acquired.
+     Note: this isolates and tests CASE 1 in which another participant could 
have acquired the lease between the time
+     the read was done and subsequent write was carried out
+  */
+  @Test (dependsOnMethods = "testAcquireLeaseSingleParticipant")
+  public void testAcquireLeaseIfNewRow() throws IOException {
+    // Inserting the first time should update 1 row
+    
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 1);
+    // Inserting the second time should not update any rows
+    
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 0);
+  }
+
+    /*
+    Tests CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT to ensure 
insertion is not completed if another
+    participant updated the table between the prior reed and attempted 
insertion.
+    Note: this isolates and tests CASE 4 in which a flow action event has an 
out of date lease, so a participant
+    attempts a new one given the table the eventTimestamp and 
leaseAcquisitionTimestamp values are unchanged.
+   */
+  @Test (dependsOnMethods = "testAcquireLeaseIfNewRow")
+  public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() 
throws IOException {
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+        this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+
+    // The following insert will fail since the eventTimestamp does not match
+    int numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+        formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
+        new Timestamp(99999), new 
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+    Assert.assertEquals(numRowsUpdated, 0);
+
+    // The following insert will fail since the leaseAcquisitionTimestamp does 
not match
+    numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+        formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
+        new Timestamp(selectInfoResult.getEventTimeMillis()), new 
Timestamp(99999));
+    Assert.assertEquals(numRowsUpdated, 0);
+
+    // This insert should work since the values match all the columns
+    numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+        formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
+        new Timestamp(selectInfoResult.getEventTimeMillis()),
+        new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+    Assert.assertEquals(numRowsUpdated, 1);
+  }
+
+  /*
+  Tests CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT to ensure 
the insertion will only succeed if another
+  participant has not updated the eventTimestamp state since the prior read.
+  Note: This isolates and tests CASE 6 during which current participant saw a 
distinct flow action event had completed
+  its prior lease, encouraging the current participant to acquire a lease for 
its event.
+   */
+  @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
+  public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() throws 
IOException, InterruptedException {
+    // Mark the resume action lease from above as completed by fabricating a 
LeaseObtainedStatus
+    MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+        this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+    boolean markedSuccess = 
this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+        resumeDagAction, selectInfoResult.getEventTimeMillis(), 
selectInfoResult.getLeaseAcquisitionTimeMillis()));
+    Assert.assertTrue(markedSuccess);
+
+    // Sleep enough time for event to be considered distinct
+    Thread.sleep(LINGER);
+
+    // The following insert will fail since eventTimestamp does not match the 
expected
+    int numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+        formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
+        new Timestamp(99999), null);
+    Assert.assertEquals(numRowsUpdated, 0);
+
+    // This insert does match since we utilize the right eventTimestamp
+    numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+        formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
+        new Timestamp(selectInfoResult.getEventTimeMillis()), null);
+    Assert.assertEquals(numRowsUpdated, 1);
+  }
 }

Reply via email to