This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 01c433c6d [GOBBLIN-1859] Multi-active Unit Test for Multi-Participant
state (#3721)
01c433c6d is described below
commit 01c433c6dffcfd8bf699313542f2e386b87d4627
Author: umustafi <[email protected]>
AuthorDate: Thu Aug 10 15:04:53 2023 -0700
[GOBBLIN-1859] Multi-active Unit Test for Multi-Participant state (#3721)
* new unit tests passing
* clean up
* change upsert to insert
* catch sql exception & update test
* Refactor to create better api for testing
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 182 +++++++++++++--------
.../api/MysqlMultiActiveLeaseArbiterTest.java | 95 ++++++++++-
2 files changed, 202 insertions(+), 75 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 2cdcf71ce..964a29851 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -18,21 +18,19 @@
package org.apache.gobblin.runtime.api;
import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import com.zaxxer.hikari.HikariDataSource;
-
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
@@ -89,6 +87,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
private final int linger;
private String thisTableGetInfoStatement;
private String thisTableSelectAfterInsertStatement;
+ private String thisTableAcquireLeaseIfMatchingAllStatement;
+ private String thisTableAcquireLeaseIfFinishedStatement;
// TODO: define retention on this table
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %s ("
@@ -120,9 +120,9 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
+ "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM
%s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Insert or update row to acquire lease if values have not changed since
the previous read
// Need to define three separate statements to handle cases where row does
not exist or has null values to check
- protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group,
"
- + "flow_name, flow_execution_id, flow_action, event_timestamp,
lease_acquisition_timestamp) "
- + "VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
+ protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT
INTO %s (flow_group, flow_name, "
+ + "flow_execution_id, flow_action, event_timestamp,
lease_acquisition_timestamp) VALUES(?, ?, ?, ?, "
+ + "CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ "SET event_timestamp=CURRENT_TIMESTAMP,
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND
lease_acquisition_timestamp is NULL";
@@ -154,6 +154,10 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
this.constantsTableName);
this.thisTableSelectAfterInsertStatement =
String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
+ this.thisTableAcquireLeaseIfMatchingAllStatement =
+
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
+ this.thisTableAcquireLeaseIfFinishedStatement =
+
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
String createArbiterStatement = String.format(
CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
@@ -186,38 +190,14 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- // Check table for an existing entry for this flow action and event time
- Optional<GetEventInfoResult> getResult =
withPreparedStatement(thisTableGetInfoStatement,
- getInfoStatement -> {
- int i = 0;
- getInfoStatement.setString(++i, flowAction.getFlowGroup());
- getInfoStatement.setString(++i, flowAction.getFlowName());
- getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
- getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
- ResultSet resultSet = getInfoStatement.executeQuery();
- try {
- if (!resultSet.next()) {
- return Optional.<GetEventInfoResult>absent();
- }
- return Optional.of(createGetInfoResult(resultSet));
- } finally {
- if (resultSet != null) {
- resultSet.close();
- }
- }
- }, true);
+ // Query lease arbiter table about this flow action
+ Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);
try {
if (!getResult.isPresent()) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no
existing row for this flow action, then go"
+ " ahead and insert", flowAction, eventTimeMillis);
- String formattedAcquireLeaseNewRowStatement =
- String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName);
- int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseNewRowStatement,
- insertStatement -> {
- completeInsertPreparedStatement(insertStatement, flowAction);
- return insertStatement.executeUpdate();
- }, true);
+ int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.absent());
}
@@ -257,14 +237,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
}
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
- String formattedAcquireLeaseIfMatchingAllStatement =
-
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
- int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
- insertStatement -> {
- completeUpdatePreparedStatement(insertStatement, flowAction,
true,
- true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return insertStatement.executeUpdate();
- }, true);
+ int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
flowAction,
+ true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
if (isWithinEpsilon) {
@@ -275,20 +249,39 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6:
Distinct event, no longer leasing event in "
+ "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
- String formattedAcquireLeaseIfFinishedStatement =
-
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
- int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
- insertStatement -> {
- completeUpdatePreparedStatement(insertStatement, flowAction,
true,
- false, dbEventTimestamp, null);
- return insertStatement.executeUpdate();
- }, true);
+ int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
+ true, false, dbEventTimestamp, null);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ /**
+ * Checks leaseArbiterTable for an existing entry for this flow action and
event time
+ */
+ protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException {
+ return withPreparedStatement(thisTableGetInfoStatement,
+ getInfoStatement -> {
+ int i = 0;
+ getInfoStatement.setString(++i, flowAction.getFlowGroup());
+ getInfoStatement.setString(++i, flowAction.getFlowName());
+ getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
+ getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
+ ResultSet resultSet = getInfoStatement.executeQuery();
+ try {
+ if (!resultSet.next()) {
+ return Optional.absent();
+ }
+ return Optional.of(createGetInfoResult(resultSet));
+ } finally {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ }
+ }, true);
+ }
+
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
try {
// Extract values from result set
@@ -313,7 +306,63 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
}
- protected SelectInfoResult createSelectInfoResult(ResultSet resultSet)
throws IOException {
+ /**
+ * Called by participant to try to acquire lease for a flow action that does
not have an attempt in progress or in
+ * near past for it.
+ * @return int corresponding to number of rows updated by INSERT statement
to acquire lease
+ */
+ protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction)
throws IOException {
+ String formattedAcquireLeaseNewRowStatement =
+ String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName);
+ return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
+ insertStatement -> {
+ completeInsertPreparedStatement(insertStatement, flowAction);
+ try {
+ return insertStatement.executeUpdate();
+ } catch (SQLIntegrityConstraintViolationException e) {
+ if (!e.getMessage().contains("Duplicate entry")) {
+ throw e;
+ }
+ return 0;
+ }
+ }, true);
+ }
+
+ /**
+ * Called by participant to try to acquire lease for a flow action that has
an existing, completed, or expired lease
+ * attempt for the flow action in the table.
+ * @return int corresponding to number of rows updated by INSERT statement
to acquire lease
+ */
+ protected int attemptLeaseIfExistingRow(String acquireLeaseStatement,
DagActionStore.DagAction flowAction,
+ boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp
dbEventTimestamp,
+ Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
+ return withPreparedStatement(acquireLeaseStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
needEventTimeCheck,
+ needLeaseAcquisition, dbEventTimestamp,
dbLeaseAcquisitionTimestamp);
+ return insertStatement.executeUpdate();
+ }, true);
+ }
+
+ /**
+ * Checks leaseArbiter table for a row corresponding to this flow action to
determine if the lease acquisition attempt
+ * was successful or not.
+ */
+ protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction)
throws IOException {
+ return withPreparedStatement(thisTableSelectAfterInsertStatement,
+ selectStatement -> {
+ completeWhereClauseMatchingKeyPreparedStatement(selectStatement,
flowAction);
+ ResultSet resultSet = selectStatement.executeQuery();
+ try {
+ return createSelectInfoResult(resultSet);
+ } finally {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ }
+ }, true);
+ }
+ protected static SelectInfoResult createSelectInfoResult(ResultSet
resultSet) throws IOException {
try {
if (!resultSet.next()) {
throw new IOException("Expected num rows and
lease_acquisition_timestamp returned from query but received nothing, so "
@@ -347,24 +396,15 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
DagActionStore.DagAction flowAction, Optional<Timestamp>
dbCurrentTimestamp)
throws SQLException, IOException {
// Fetch values in row after attempted insert
- SelectInfoResult selectInfoResult =
withPreparedStatement(thisTableSelectAfterInsertStatement,
- selectStatement -> {
- completeWhereClauseMatchingKeyPreparedStatement(selectStatement,
flowAction);
- ResultSet resultSet = selectStatement.executeQuery();
- try {
- return createSelectInfoResult(resultSet);
- } finally {
- if (resultSet != null) {
- resultSet.close();
- }
- }
- }, true);
+ SelectInfoResult selectInfoResult = getRowInfo(flowAction);
if (numRowsUpdated == 1) {
log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!",
flowAction,
selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction,
selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis());
}
+ log.debug("Another participant acquired lease in between for [{},
eventTimestamp: {}] - num rows updated: ",
+ flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction,
selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis() +
selectInfoResult.getDbLinger()
@@ -377,8 +417,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @param flowAction
* @throws SQLException
*/
- protected void completeInsertPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction)
- throws SQLException {
+ protected static void completeInsertPreparedStatement(PreparedStatement
statement,
+ DagActionStore.DagAction flowAction) throws SQLException {
int i = 0;
// Values to set in new row
statement.setString(++i, flowAction.getFlowGroup());
@@ -393,8 +433,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @param flowAction
* @throws SQLException
*/
- protected void
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction)
- throws SQLException {
+ protected static void
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement,
+ DagActionStore.DagAction flowAction) throws SQLException {
int i = 0;
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
@@ -413,8 +453,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @param originalLeaseAcquisitionTimestamp value to compare to db one, null
if not needed
* @throws SQLException
*/
- protected void completeUpdatePreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction,
- boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
+ protected static void completeUpdatePreparedStatement(PreparedStatement
statement,
+ DagActionStore.DagAction flowAction, boolean needEventTimeCheck, boolean
needLeaseAcquisitionTimeCheck,
Timestamp originalEventTimestamp, Timestamp
originalLeaseAcquisitionTimestamp) throws SQLException {
int i = 0;
// Values to check if existing row matches previous read
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 3ede1ce83..1eb872586 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.runtime.api;
import com.typesafe.config.Config;
+import java.io.IOException;
+import java.sql.Timestamp;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -27,6 +29,8 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
+
@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
private static final int EPSILON = 30000;
@@ -37,11 +41,17 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowGroup = "testFlowGroup";
private static final String flowName = "testFlowName";
private static final String flowExecutionId = "12345677";
+ // The following are considered unique because they correspond to different
flow action types
private static DagActionStore.DagAction launchDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
-
+ private static DagActionStore.DagAction resumeDagAction =
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.RESUME);
private static final long eventTimeMillis = System.currentTimeMillis();
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
+ private String formattedAcquireLeaseIfMatchingAllStatement =
+
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
TABLE);
+ private String formattedAcquireLeaseIfFinishedStatement =
+ String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
TABLE);
// The setup functionality verifies that the initialization of the tables is
done correctly and verifies any SQL
// syntax errors.
@@ -50,12 +60,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
- .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, EPSILON)
- .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, LINGER)
+ .addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
EPSILON)
+ .addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
LINGER)
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
- .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+
.addPrimitive(ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
TABLE)
.build();
this.mysqlMultiActiveLeaseArbiter = new
MysqlMultiActiveLeaseArbiter(config);
@@ -143,4 +153,81 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
+
+ /*
+ Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no
row matches the primary key in the table.
+ If such a row does exist, the method should disregard the resulting SQL
error and return 0 rows updated, indicating
+ the lease was not acquired.
+ Note: this isolates and tests CASE 1 in which another participant could
have acquired the lease between the time
+ the read was done and subsequent write was carried out
+ */
+ @Test (dependsOnMethods = "testAcquireLeaseSingleParticipant")
+ public void testAcquireLeaseIfNewRow() throws IOException {
+ // Inserting the first time should update 1 row
+
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
1);
+ // Inserting the second time should not update any rows
+
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
0);
+ }
+
+ /*
+ Tests CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT to ensure
insertion is not completed if another
+ participant updated the table between the prior reed and attempted
insertion.
+ Note: this isolates and tests CASE 4 in which a flow action event has an
out of date lease, so a participant
+ attempts a new one given the table the eventTimestamp and
leaseAcquisitionTimestamp values are unchanged.
+ */
+ @Test (dependsOnMethods = "testAcquireLeaseIfNewRow")
+ public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement()
throws IOException {
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+ this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+
+ // The following insert will fail since the eventTimestamp does not match
+ int numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
+ new Timestamp(99999), new
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+ Assert.assertEquals(numRowsUpdated, 0);
+
+ // The following insert will fail since the leaseAcquisitionTimestamp does
not match
+ numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
+ new Timestamp(selectInfoResult.getEventTimeMillis()), new
Timestamp(99999));
+ Assert.assertEquals(numRowsUpdated, 0);
+
+ // This insert should work since the values match all the columns
+ numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
+ new Timestamp(selectInfoResult.getEventTimeMillis()),
+ new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+ Assert.assertEquals(numRowsUpdated, 1);
+ }
+
+ /*
+ Tests CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT to ensure
the insertion will only succeed if another
+ participant has not updated the eventTimestamp state since the prior read.
+ Note: This isolates and tests CASE 6 during which current participant saw a
distinct flow action event had completed
+ its prior lease, encouraging the current participant to acquire a lease for
its event.
+ */
+ @Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
+ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() throws
IOException, InterruptedException {
+ // Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+ this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ boolean markedSuccess =
this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+ resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis()));
+ Assert.assertTrue(markedSuccess);
+
+ // Sleep enough time for event to be considered distinct
+ Thread.sleep(LINGER);
+
+ // The following insert will fail since eventTimestamp does not match the
expected
+ int numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
+ new Timestamp(99999), null);
+ Assert.assertEquals(numRowsUpdated, 0);
+
+ // This insert does match since we utilize the right eventTimestamp
+ numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
+ new Timestamp(selectInfoResult.getEventTimeMillis()), null);
+ Assert.assertEquals(numRowsUpdated, 1);
+ }
}