This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ca48bcd0a [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter
with Single Participant (#3715)
ca48bcd0a is described below
commit ca48bcd0aee7bb4ecd7277b25944bff9f4fc378f
Author: umustafi <[email protected]>
AuthorDate: Tue Jul 18 14:03:35 2023 -0700
[GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single
Participant (#3715)
* Unit tests and corresponding fixes for 6 lease acquisition cases
* remove logs
* address review comments with small changes
* clean up naming, logs, java doc
* close resultSets
* check if test passes on github
* fix unit test
* insert or update constants into table correctly
* rename prop
* address comments
---------
Co-authored-by: Urmi Mustafi <[email protected]>
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +-
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 326 ++++++++++++++-------
.../api/MysqlMultiActiveLeaseArbiterTest.java | 146 +++++++++
.../modules/core/GobblinServiceGuiceModule.java | 2 +-
.../modules/orchestration/FlowTriggerHandler.java | 75 +++--
5 files changed, 413 insertions(+), 138 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 2d60fd5c8..50bd2a03d 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -99,7 +99,7 @@ public class ConfigurationKeys {
public static final String MYSQL_LEASE_ARBITER_PREFIX =
"MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "gobblin_multi_active_scheduler_constants_store";
- public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY =
"eventToRevisitTimestampMillis";
public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY =
"triggerEventTimestampMillis";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 8a40c71b2..9318d8b56 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime.api;
+import com.google.common.base.Optional;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -29,6 +30,7 @@ import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -54,6 +56,11 @@ import org.apache.gobblin.util.ConfigUtils;
* than epsilon and encapsulate executor communication latency
including retry attempts
*
* The `event_timestamp` is the time of the flow_action event request.
+ * --- Note ---
+ * We only use the participant's local event_timestamp internally to identify
the particular flow_action event, but
+ * after interacting with the database utilize the CURRENT_TIMESTAMP of the
database to insert or keep
+ * track of our event. This is to avoid any discrepancies due to clock drift
between participants as well as
+ * variation in local time and database time for future comparisons.
* ---Event consolidation---
* Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
* (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
@@ -80,41 +87,48 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
private final String constantsTableName;
private final int epsilon;
private final int linger;
+ private String thisTableGetInfoStatement;
+ private String thisTableSelectAfterInsertStatement;
// TODO: define retention on this table
- private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ "event_timestamp TIMESTAMP, "
- + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP,
"
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
- + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO
%s (epsilon, linger) VALUES (?,?)";
+ + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY
(primary_key))";
+ // Only insert epsilon and linger values from config if this table does not
contain a pre-existing values already.
+ private static final String UPSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO
%s (primary_key, epsilon, linger) "
+ + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon),
linger=VALUES(linger)";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ " AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW =
WHERE_CLAUSE_TO_MATCH_KEY
+ " AND event_timestamp=? AND lease_acquisition_timestamp=?";
- protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
- + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
- // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
- // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
current time in db is within epsilon of
+ // event_timestamp), leaseValidityStatus (1 if lease has not expired, 2 if
expired, 3 if column is NULL or no longer
+ // leasing)
protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
- + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
- + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
- + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
- + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000
<= epsilon as is_within_epsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
+ + "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
+ + "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM
%s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Insert or update row to acquire lease if values have not changed since
the previous read
// Need to define three separate statements to handle cases where row does
not exist or has null values to check
- protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
- + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?)";
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group,
"
+ + "flow_name, flow_execution_id, flow_action, event_timestamp,
lease_acquisition_timestamp) "
+ + "VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
- + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
- + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
+ + "SET event_timestamp=CURRENT_TIMESTAMP,
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND
lease_acquisition_timestamp is NULL";
protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
- + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
- + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+ + "SET event_timestamp=CURRENT_TIMESTAMP,
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
// Complete lease acquisition if values have not changed since lease was
acquired
protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
@@ -136,81 +150,107 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+ this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT,
this.leaseArbiterTableName,
+ this.constantsTableName);
+ this.thisTableSelectAfterInsertStatement =
String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
+ this.constantsTableName);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ String createArbiterStatement = String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
- PreparedStatement createStatement =
connection.prepareStatement(String.format(
- CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ PreparedStatement createStatement =
connection.prepareStatement(createArbiterStatement)) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
- withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
- createStatement -> {
+ initializeConstantsTable();
+
+ log.info("MysqlMultiActiveLeaseArbiter initialized");
+ }
+
+ // Initialize Constants table if needed and insert row into it if one does
not exist
+ private void initializeConstantsTable() throws IOException {
+ String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+
+ String insertConstantsStatement =
String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(insertConstantsStatement, insertStatement -> {
int i = 0;
- createStatement.setInt(++i, epsilon);
- createStatement.setInt(++i, linger);
- return createStatement.executeUpdate();}, true);
+ insertStatement.setInt(++i, epsilon);
+ insertStatement.setInt(++i, linger);
+ return insertStatement.executeUpdate();
+ }, true);
}
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
// Check table for an existing entry for this flow action and event time
- ResultSet resultSet = withPreparedStatement(
- String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName),
+ Optional<GetEventInfoResult> getResult =
withPreparedStatement(thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
- getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
- return getInfoStatement.executeQuery();
+ ResultSet resultSet = getInfoStatement.executeQuery();
+ try {
+ if (!resultSet.next()) {
+ return Optional.absent();
+ }
+ return Optional.of(createGetInfoResult(resultSet));
+ } finally {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ }
}, true);
- String formattedSelectAfterInsertStatement =
- String.format(SELECT_AFTER_INSERT_STATEMENT,
this.leaseArbiterTableName, this.constantsTableName);
try {
- // CASE 1: If no existing row for this flow action, then go ahead and
insert
- if (!resultSet.next()) {
+ if (!getResult.isPresent()) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no
existing row for this flow action, then go"
+ + " ahead and insert", flowAction, eventTimeMillis);
String formattedAcquireLeaseNewRowStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseNewRowStatement + "; " +
formattedSelectAfterInsertStatement,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
- completeInsertPreparedStatement(insertStatement, flowAction,
eventTimeMillis);
- return insertStatement.executeQuery();
+ completeInsertPreparedStatement(insertStatement, flowAction);
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.absent());
}
// Extract values from result set
- Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
- Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
- boolean isWithinEpsilon = resultSet.getBoolean("isWithinEpsilon");
- int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
- int dbLinger = resultSet.getInt("linger");
+ Timestamp dbEventTimestamp = getResult.get().getDbEventTimestamp();
+ Timestamp dbLeaseAcquisitionTimestamp =
getResult.get().getDbLeaseAcquisitionTimestamp();
+ boolean isWithinEpsilon = getResult.get().isWithinEpsilon();
+ int leaseValidityStatus = getResult.get().getLeaseValidityStatus();
+ // Used to calculate minimum amount of time until a participant should
check whether a lease expired
+ int dbLinger = getResult.get().getDbLinger();
+ Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
+
+ log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, triggerEventTimestamp: {}] with "
+ + "database eventTimestamp {}", flowAction, eventTimeMillis,
dbCurrentTimestamp.getTime());
- // CASE 2: If our event timestamp is older than the last event in db,
then skip this trigger
- if (eventTimeMillis < dbEventTimestamp.getTime()) {
- return new NoLongerLeasingStatus();
- }
// Lease is valid
if (leaseValidityStatus == 1) {
- // CASE 3: Same event, lease is valid
if (isWithinEpsilon) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 2:
Same event, lease is valid", flowAction,
+ dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(flowAction,
dbEventTimestamp.getTime(),
- dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
System.currentTimeMillis());
+ dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
- // CASE 4: Distinct event, lease is valid
- // Utilize db timestamp for wait time, but be reminded of own event
timestamp
- return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
- dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
System.currentTimeMillis());
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 3:
Distinct event, lease is valid", flowAction,
+ dbCurrentTimestamp.getTime());
+ // Utilize db lease acquisition timestamp for wait time
+ return new LeasedToAnotherStatus(flowAction,
dbCurrentTimestamp.getTime(),
+ dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
- // CASE 5: Lease is out of date (regardless of whether same or distinct
event)
else if (leaseValidityStatus == 2) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4:
Lease is out of date (regardless of whether "
+ + "same or distinct event)", flowAction,
dbCurrentTimestamp.getTime());
if (isWithinEpsilon) {
log.warn("Lease should not be out of date for the same trigger event
since epsilon << linger for flowAction"
+ " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp
{}, linger {}", flowAction,
@@ -219,84 +259,143 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfMatchingAllStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
- // CASE 6: Same event, no longer leasing event in db: terminate
if (isWithinEpsilon) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5:
Same event, no longer leasing event in db: "
+ + "terminate", flowAction, dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
- // CASE 7: Distinct event, no longer leasing event in db
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6:
Distinct event, no longer leasing event in "
+ + "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfFinishedStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
false, dbEventTimestamp, null);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
+ try {
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+ int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+ int dbLinger = resultSet.getInt("linger");
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+ return new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+ dbLinger, dbCurrentTimestamp);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ protected SelectInfoResult createSelectInfoResult(ResultSet resultSet)
throws IOException {
+ try {
+ if (!resultSet.next()) {
+ throw new IOException("Expected num rows and
lease_acquisition_timestamp returned from query but received nothing, so "
+ + "providing empty result to lease evaluation code");
+ }
+ long eventTimeMillis = resultSet.getTimestamp(1).getTime();
+ long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
+ int dbLinger = resultSet.getInt(3);
+ return new SelectInfoResult(eventTimeMillis,
leaseAcquisitionTimeMillis, dbLinger);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
/**
- * Attempt lease by insert or update following a read based on the condition
the state of the table has not changed
- * since the read. Parse the result to return the corresponding status based
on successful insert/update or not.
- * @param resultSet
- * @param eventTimeMillis
- * @return LeaseAttemptStatus
+ * Parse result of attempted insert/update to obtain a lease for a
+ * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} event by
selecting values corresponding to that
+ * event from the table to return the corresponding status based on
successful insert/update or not.
* @throws SQLException
* @throws IOException
*/
- protected LeaseAttemptStatus
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet,
- DagActionStore.DagAction flowAction, long eventTimeMillis)
+ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int
numRowsUpdated,
+ DagActionStore.DagAction flowAction, Optional<Timestamp>
dbCurrentTimestamp)
throws SQLException, IOException {
- if (!resultSet.next()) {
- throw new IOException("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
- }
- int numRowsUpdated = resultSet.getInt(1);
- long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
- int dbLinger = resultSet.getInt(3);
+ // Fetch values in row after attempted insert
+ SelectInfoResult selectInfoResult =
withPreparedStatement(thisTableSelectAfterInsertStatement,
+ selectStatement -> {
+ completeWhereClauseMatchingKeyPreparedStatement(selectStatement,
flowAction);
+ ResultSet resultSet = selectStatement.executeQuery();
+ try {
+ return createSelectInfoResult(resultSet);
+ } finally {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ }
+ }, true);
if (numRowsUpdated == 1) {
- return new LeaseObtainedStatus(flowAction, eventTimeMillis,
leaseAcquisitionTimeMillis);
+ log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!",
flowAction,
+ selectInfoResult.eventTimeMillis);
+ return new LeaseObtainedStatus(flowAction,
selectInfoResult.eventTimeMillis,
+ selectInfoResult.getLeaseAcquisitionTimeMillis());
}
// Another participant acquired lease in between
- return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
- leaseAcquisitionTimeMillis + dbLinger - System.currentTimeMillis());
+ return new LeasedToAnotherStatus(flowAction,
selectInfoResult.getEventTimeMillis(),
+ selectInfoResult.getLeaseAcquisitionTimeMillis() +
selectInfoResult.getDbLinger()
+ - (dbCurrentTimestamp.isPresent() ?
dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}
/**
* Complete the INSERT statement for a new flow action lease where the flow
action is not present in the table
* @param statement
* @param flowAction
- * @param eventTimeMillis
* @throws SQLException
*/
- protected void completeInsertPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction,
- long eventTimeMillis) throws SQLException {
+ protected void completeInsertPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction)
+ throws SQLException {
int i = 0;
// Values to set in new row
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
statement.setString(++i, flowAction.getFlowExecutionId());
statement.setString(++i, flowAction.getFlowActionType().toString());
- statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
- // Values to check if existing row matches previous read
- statement.setString(++i, flowAction.getFlowGroup());
- statement.setString(++i, flowAction.getFlowName());
- statement.setString(++i, flowAction.getFlowExecutionId());
- statement.setString(++i, flowAction.getFlowActionType().toString());
- // Values to select for return
+ }
+
+ /**
+ * Complete the WHERE clause to match a flow action in a select statement
+ * @param statement
+ * @param flowAction
+ * @throws SQLException
+ */
+ protected void
completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction)
+ throws SQLException {
+ int i = 0;
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
statement.setString(++i, flowAction.getFlowExecutionId());
@@ -308,7 +407,6 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* updated.
* @param statement
* @param flowAction
- * @param eventTimeMillis
* @param needEventTimeCheck true if need to compare
`originalEventTimestamp` with db event_timestamp
* @param needLeaseAcquisitionTimeCheck true if need to compare
`originalLeaseAcquisitionTimestamp` with db one
* @param originalEventTimestamp value to compare to db one, null if not
needed
@@ -316,11 +414,9 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @throws SQLException
*/
protected void completeUpdatePreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction,
- long eventTimeMillis, boolean needEventTimeCheck, boolean
needLeaseAcquisitionTimeCheck,
+ boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
Timestamp originalEventTimestamp, Timestamp
originalLeaseAcquisitionTimestamp) throws SQLException {
int i = 0;
- // Value to update
- statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
// Values to check if existing row matches previous read
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
@@ -333,11 +429,6 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (needLeaseAcquisitionTimeCheck) {
statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp);
}
- // Values to select for return
- statement.setString(++i, flowAction.getFlowGroup());
- statement.setString(++i, flowAction.getFlowName());
- statement.setString(++i, flowAction.getFlowExecutionId());
- statement.setString(++i, flowAction.getFlowActionType().toString());
}
@Override
@@ -375,17 +466,44 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
/** Abstracts recurring pattern around resource management and exception
re-mapping. */
- protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws
IOException {
+ protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+ throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
T result = f.apply(statement);
if (shouldCommit) {
connection.commit();
}
+ statement.close();
return result;
} catch (SQLException e) {
- log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} Exception is {}",
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+ log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} "
+ + "Exception is {}", ((HikariDataSource)
this.dataSource).getConnectionTestQuery(), e);
throw new IOException(e);
}
}
+
+
+ /**
+ * DTO for arbiter's current lease state for a FlowActionEvent
+ */
+ @Data
+ static class GetEventInfoResult {
+ private final Timestamp dbEventTimestamp;
+ private final Timestamp dbLeaseAcquisitionTimestamp;
+ private final boolean withinEpsilon;
+ private final int leaseValidityStatus;
+ private final int dbLinger;
+ private final Timestamp dbCurrentTimestamp;
+ }
+
+ /**
+ DTO for result of SELECT query used to determine status of lease
acquisition attempt
+ */
+ @Data
+ static class SelectInfoResult {
+ private final long eventTimeMillis;
+ private final long leaseAcquisitionTimeMillis;
+ private final int dbLinger;
+ }
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
new file mode 100644
index 000000000..3ede1ce83
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class MysqlMultiActiveLeaseArbiterTest {
+ private static final int EPSILON = 30000;
+ private static final int LINGER = 80000;
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
+ private static final String flowGroup = "testFlowGroup";
+ private static final String flowName = "testFlowName";
+ private static final String flowExecutionId = "12345677";
+ private static DagActionStore.DagAction launchDagAction =
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
+
+ private static final long eventTimeMillis = System.currentTimeMillis();
+ private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
+
+ // The setup functionality verifies that the initialization of the tables is
done correctly and verifies any SQL
+ // syntax errors.
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, EPSILON)
+ .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, LINGER)
+ .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.mysqlMultiActiveLeaseArbiter = new
MysqlMultiActiveLeaseArbiter(config);
+ }
+
+ /*
+ Tests all cases of trying to acquire a lease (CASES 1-6 detailed below)
for a flow action event with one
+ participant involved.
+ */
+ // TODO: refactor this to break it into separate test cases as much is
possible
+ @Test
+ public void testAcquireLeaseSingleParticipant() throws Exception {
+ // Tests CASE 1 of acquire lease for a flow action event not present in DB
+ MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ Assert.assertTrue(firstLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+ MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+ Assert.assertTrue(firstObtainedStatus.getEventTimestamp() <=
+ firstObtainedStatus.getLeaseAcquisitionTimestamp());
+ Assert.assertTrue(firstObtainedStatus.getFlowAction().equals(
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH)));
+
+ // Verify that different DagAction types for the same flow can have leases
at the same time
+ DagActionStore.DagAction killDagAction = new
+ DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
+ MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction,
eventTimeMillis);
+ Assert.assertTrue(killStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+ MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
+ Assert.assertTrue(
+ killObtainedStatus.getLeaseAcquisitionTimestamp() >=
killObtainedStatus.getEventTimestamp());
+
+ // Tests CASE 2 of acquire lease for a flow action event that already has
a valid lease for the same event in db
+ // Very little time should have passed if this test directly follows the
one above so this call will be considered
+ // the same as the previous event
+ MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ Assert.assertTrue(secondLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
+ MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+ (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+ Assert.assertTrue(secondLeasedToAnotherStatus.getEventTimeMillis() ==
firstObtainedStatus.getEventTimestamp());
+
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis()
>= LINGER);
+
+ // Tests CASE 3 of trying to acquire a lease for a distinct flow action
event, while the previous event's lease is
+ // valid
+ // Allow enough time to pass for this trigger to be considered distinct,
but not enough time so the lease expires
+ Thread.sleep(EPSILON * 3/2);
+ MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ Assert.assertTrue(thirdLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
+ MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
+ (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
+ Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() >
firstObtainedStatus.getEventTimestamp());
+
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() <
LINGER);
+
+ // Tests CASE 4 of lease out of date
+ Thread.sleep(LINGER);
+ MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ Assert.assertTrue(fourthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+ MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
+ Assert.assertTrue(fourthObtainedStatus.getEventTimestamp() >
eventTimeMillis + LINGER);
+ Assert.assertTrue(fourthObtainedStatus.getEventTimestamp()
+ <= fourthObtainedStatus.getLeaseAcquisitionTimestamp());
+
+ // Tests CASE 5 of no longer leasing the same event in DB
+ // done immediately after previous lease obtainment so should be marked as
the same event
+
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
+ Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimestamp() < EPSILON);
+ MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ Assert.assertTrue(fifthLaunchStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
+
+ // Tests CASE 6 of no longer leasing a distinct event in DB
+ // Wait so this event is considered distinct and a new lease will be
acquired
+ Thread.sleep(EPSILON * 3/2);
+ MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ Assert.assertTrue(sixthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
+ MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
+ Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
+ <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index c0f140a9f..1423e2d8c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -168,7 +168,7 @@ public class GobblinServiceGuiceModule implements Module {
OptionalBinder.newOptionalBinder(binder, MultiActiveLeaseArbiter.class);
OptionalBinder.newOptionalBinder(binder, FlowTriggerHandler.class);
if (serviceConfig.isMultiActiveSchedulerEnabled()) {
- binder.bind(MysqlMultiActiveLeaseArbiter.class);
+
binder.bind(MultiActiveLeaseArbiter.class).to(MysqlMultiActiveLeaseArbiter.class);
binder.bind(FlowTriggerHandler.class);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 42ab0af96..ec63276c4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.google.common.base.Optional;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -62,15 +63,15 @@ import org.apache.gobblin.util.ConfigUtils;
public class FlowTriggerHandler {
private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
- protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter;
protected SchedulerService schedulerService;
- protected DagActionStore dagActionStore;
+ protected Optional<DagActionStore> dagActionStore;
private MetricContext metricContext;
private ContextAwareMeter numFlowsSubmitted;
@Inject
- public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
- SchedulerService schedulerService, DagActionStore dagActionStore) {
+ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter>
leaseDeterminationStore,
+ SchedulerService schedulerService, Optional<DagActionStore>
dagActionStore) {
this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
this.multiActiveLeaseArbiter = leaseDeterminationStore;
@@ -91,41 +92,51 @@ public class FlowTriggerHandler {
*/
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
- multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
- // TODO: add a log event or metric for each of these cases
- if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
- MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
- if (persistFlowAction(leaseObtainedStatus)) {
+ if (multiActiveLeaseArbiter.isPresent()) {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+ if (persistFlowAction(leaseObtainedStatus)) {
+ return;
+ }
+ // If persisting the flow action failed, then we set another trigger
for this event to occur immediately to
+ // re-attempt handling the event
+ scheduleReminderForEvent(jobProps,
+ new MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
leaseObtainedStatus.getEventTimestamp(), 0L),
+ eventTimeMillis);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
+ eventTimeMillis);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
return;
}
- // If persisting the flow action failed, then we set another trigger for
this event to occur immediately to
- // re-attempt handling the event
- scheduleReminderForEvent(jobProps, new
MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
- leaseObtainedStatus.getEventTimestamp(), 0L), eventTimeMillis);
- return;
- } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
- scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
- eventTimeMillis);
- return;
- } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
- return;
+ throw new RuntimeException(String.format("Received type of
leaseAttemptStatus: %s not handled by this method",
+ leaseAttemptStatus.getClass().getName()));
+ } else {
+ throw new RuntimeException(String.format("Multi-active scheduler is not
enabled so trigger event should not be "
+ + "handled with this method."));
}
- throw new RuntimeException(String.format("Received type of
leaseAttemptStatus: %s not handled by this method",
- leaseAttemptStatus.getClass().getName()));
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus leaseStatus) {
- try {
- DagActionStore.DagAction flowAction = leaseStatus.getFlowAction();
- this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numFlowsSubmitted.mark();
- return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (this.dagActionStore.isPresent() &&
this.multiActiveLeaseArbiter.isPresent()) {
+ try {
+ DagActionStore.DagAction flowAction = leaseStatus.getFlowAction();
+ this.dagActionStore.get().addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(), flowAction.getFlowExecutionId(),
flowAction.getFlowActionType());
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numFlowsSubmitted.mark();
+ return
this.multiActiveLeaseArbiter.get().recordLeaseSuccess(leaseStatus);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException("DagActionStore is " +
(this.dagActionStore.isPresent() ? "" : "NOT") + " present. "
+ + "Multi-Active scheduler is " +
(this.multiActiveLeaseArbiter.isPresent() ? "" : "NOT") + " present. Both "
+ + "should be enabled if this method is called.");
}
}