This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2a2edfcb4 [GOBBLIN-1921] Properly handle reminder events (#3790)
2a2edfcb4 is described below
commit 2a2edfcb40c656c74865f58390d0eb64fb835f1d
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 3 11:50:55 2023 -0700
[GOBBLIN-1921] Properly handle reminder events (#3790)
* Add millisecond level precision to timestamp cols & proper timezone
conversion
- existing tests pass with minor modifications
* Handle reminder events properly
* Fix compilation errors & add isReminder flag
* Add unit tests
* Address review comments
* Add newline to address comment
* Include reminder/original tag in logging
* Clarify timezone issues in comment
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../runtime/api/MultiActiveLeaseArbiter.java | 9 +-
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 189 ++++++++++++++-------
.../api/MysqlMultiActiveLeaseArbiterTest.java | 123 +++++++++++---
.../modules/orchestration/FlowTriggerHandler.java | 10 +-
.../modules/orchestration/Orchestrator.java | 9 +-
.../scheduler/GobblinServiceJobScheduler.java | 4 +-
.../modules/orchestration/OrchestratorTest.java | 4 +-
8 files changed, 242 insertions(+), 107 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 198c8de35..81608655a 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -109,6 +109,7 @@ public class ConfigurationKeys {
// Event time of flow action to orchestrate using the multi-active lease
arbiter
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY =
"orchestratorTriggerEventTimeMillis";
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL =
"-1";
+ public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
// Note: linger should be on the order of seconds even though we measure in
millis
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
index ab9e03599..faacb0995 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -24,7 +24,7 @@ import lombok.Data;
/**
* This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
- * more active participants compete to take responsiblity for a particular
flow's event. The type of flow event in
+ * more active participants compete to take responsibility for a particular
flow's event. The type of flow event in
* question does not impact the algorithm other than to uniquely identify the
flow event. Each participant uses the
* interface to initiate an attempt at ownership over the flow event and
receives a response indicating the status of
* the attempt.
@@ -38,7 +38,8 @@ import lombok.Data;
* b) LeasedToAnotherStatus -> another will attempt to carry out the
required action before the lease expires
* c) NoLongerLeasingStatus -> flow event no longer needs to be acted
upon (terminal state)
* 3. If another participant has acquired the lease before this one could,
then the present participant must check back
- * in at the time of lease expiry to see if it needs to attempt the lease
again [status (b) above].
+ * in at the time of lease expiry to see if it needs to attempt the lease
again [status (b) above]. We refer to this
+ * check-in as a 'reminder event'.
* 4. Once the participant which acquired the lease completes its work on the
flow event, it calls recordLeaseSuccess
* to indicate to all other participants that the flow event no longer
needs to be acted upon [status (c) above]
*/
@@ -51,10 +52,12 @@ public interface MultiActiveLeaseArbiter {
* determine the next action.
* @param flowAction uniquely identifies the flow and the present action
upon it
* @param eventTimeMillis is the time this flow action was triggered
+ * @param isReminderEvent true if the flow action event we're checking on is
a reminder event
* @return LeaseAttemptStatus
* @throws IOException
*/
- LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis) throws IOException;
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis, boolean isReminderEvent)
+ throws IOException;
/**
* This method is used to indicate the owner of the lease has successfully
completed required actions while holding
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 7d4bfb118..a7c035185 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -27,7 +27,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;
+import java.util.Calendar;
import java.util.Optional;
+import java.util.TimeZone;
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -46,10 +48,10 @@ import org.apache.gobblin.util.ConfigUtils;
* schema is as follows:
* [flow_group | flow_name | flow_action | event_timestamp |
lease_acquisition_timestamp]
* (----------------------primary key------------------------)
- * We also maintain another table in the database with two constants that
allow us to coordinate between participants and
- * ensure they are using the same values to base their coordination off of.
+ * We also maintain another table in the database with two constants that
allow us to coordinate between participants
+ * and ensure they are using the same values to base their coordination off of.
* [epsilon | linger]
- * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `epsilon` - time within we consider two event timestamps to be overlapping
and can consolidate
* `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
* than epsilon and encapsulate executor communication latency
including retry attempts
*
@@ -86,16 +88,30 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
private final int epsilon;
private final int linger;
private String thisTableGetInfoStatement;
+ private String thisTableGetInfoStatementForReminder;
private String thisTableSelectAfterInsertStatement;
private String thisTableAcquireLeaseIfMatchingAllStatement;
private String thisTableAcquireLeaseIfFinishedStatement;
// TODO: define retention on this table
+ /*
+ Notes:
+ - Set `event_timestamp` default value to turn off timestamp auto-updates
for row modifications which alters this col
+ in an unexpected way upon completing the lease
+ - MySQL converts TIMESTAMP values from the current time zone to UTC for
storage, and back from UTC to the current
+ time zone for retrieval.
https://dev.mysql.com/doc/refman/8.0/en/datetime.html
+ - Thus, for reading any timestamps from MySQL we convert the timezone
from session (default) to UTC to always
+ use epoch-millis in UTC locally
+ - Similarly, for inserting/updating any timestamps we convert the
timezone from UTC to session (as it will be
+ (interpreted automatically as session time zone) and explicitly set
all timestamp columns to avoid using the
+ default auto-update/initialization values
+ - We desire millisecond level precision and denote that with `(3)` for the
TIMESTAMP types
+ */
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "
flow_action varchar(100) NOT NULL, "
- + "event_timestamp TIMESTAMP, "
- + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP,
"
+ + "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), "
+ + "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, "
+ "PRIMARY KEY (flow_group,flow_name,flow_action))";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY
(primary_key))";
@@ -104,32 +120,50 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
+ "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon),
linger=VALUES(linger)";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW =
WHERE_CLAUSE_TO_MATCH_KEY
- + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
- protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
- + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+ + " AND event_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone)"
+ + " AND lease_acquisition_timestamp=CONVERT_TZ(?, '+00:00',
@@session.time_zone)";
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT "
+ + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as
utc_event_timestamp, "
+ + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone,
'+00:00') as utc_lease_acquisition_timestamp, "
+ + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
- // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
current time in db is within epsilon of
- // event_timestamp), leaseValidityStatus (1 if lease has not expired, 2 if
expired, 3 if column is NULL or no longer
- // leasing)
- protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
- + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000
<= epsilon as is_within_epsilon, CASE "
- + "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
- + "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
- + "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM
%s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
new event timestamp (current timestamp in
+ // db) is within epsilon of event_timestamp in the table),
leaseValidityStatus (1 if lease has not expired, 2 if
+ // expired, 3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT "
+ + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as
utc_event_timestamp, "
+ + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone,
'+00:00') as utc_lease_acquisition_timestamp, "
+ + "ABS(TIMESTAMPDIFF(microsecond, event_timestamp,
CURRENT_TIMESTAMP(3))) / 1000 <= epsilon as is_within_epsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
+ + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
+ + "ELSE 3 END as lease_validity_status, linger, "
+ + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ // Same as query above, except that isWithinEpsilon is True if the reminder
event timestamp (provided by caller) is
+ // OLDER than or equal to the db event_timestamp and within epsilon away
from it.
+ protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER =
"SELECT "
+ + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as
utc_event_timestamp, "
+ + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone,
'+00:00') as utc_lease_acquisition_timestamp, "
+ + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) /
1000 <= epsilon as is_within_epsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
+ + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
+ + "ELSE 3 END as lease_validity_status, linger, "
+ + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
// Insert or update row to acquire lease if values have not changed since
the previous read
// Need to define three separate statements to handle cases where row does
not exist or has null values to check
protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT
INTO %s (flow_group, flow_name, "
- + "flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?,
?, ?, CURRENT_TIMESTAMP, "
- + "CURRENT_TIMESTAMP)";
+ + "flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?,
?, ?, CURRENT_TIMESTAMP(3), "
+ + "CURRENT_TIMESTAMP(3))";
protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
- + "SET event_timestamp=CURRENT_TIMESTAMP,
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
- + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND
lease_acquisition_timestamp is NULL";
+ + "SET event_timestamp=CURRENT_TIMESTAMP(3),
lease_acquisition_timestamp=CURRENT_TIMESTAMP(3) "
+ + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=CONVERT_TZ(?,
'+00:00', @@session.time_zone) AND "
+ + "lease_acquisition_timestamp is NULL";
protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
- + "SET event_timestamp=CURRENT_TIMESTAMP,
lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ + "SET event_timestamp=CURRENT_TIMESTAMP(3),
lease_acquisition_timestamp=CURRENT_TIMESTAMP(3) "
+ WHERE_CLAUSE_TO_MATCH_ROW;
// Complete lease acquisition if values have not changed since lease was
acquired
protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+ protected static final Calendar UTC_CAL =
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
@Inject
public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
@@ -150,6 +184,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT,
this.leaseArbiterTableName,
this.constantsTableName);
+ this.thisTableGetInfoStatementForReminder =
String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
+ this.leaseArbiterTableName, this.constantsTableName);
this.thisTableSelectAfterInsertStatement =
String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableAcquireLeaseIfMatchingAllStatement =
@@ -186,19 +222,19 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
- throws IOException {
- log.info("Multi-active scheduler about to handle trigger event: [{},
triggerEventTimestamp: {}]", flowAction,
- eventTimeMillis);
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis,
+ boolean isReminderEvent) throws IOException {
+ log.info("Multi-active scheduler about to handle trigger event: [{}, is:
{}, triggerEventTimestamp: {}]",
+ flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis);
// Query lease arbiter table about this flow action
- Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);
+ Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction,
isReminderEvent);
try {
if (!getResult.isPresent()) {
- log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no
existing row for this flow action, then go"
- + " ahead and insert", flowAction, eventTimeMillis);
+ log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE
1: no existing row for this flow action,"
+ + " then go ahead and insert", flowAction, isReminderEvent ?
"reminder" : "original", eventTimeMillis);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.empty());
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.empty(), isReminderEvent);
}
// Extract values from result set
@@ -210,28 +246,47 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
int dbLinger = getResult.get().getDbLinger();
Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
- log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, triggerEventTimestamp: {}] with "
- + "database eventTimestamp {}", flowAction, eventTimeMillis,
dbCurrentTimestamp.getTime());
+ // For reminder event, we can stop early if the reminder eventTimeMillis
is older than the current event in the db
+ // because db laundering tells us that the currently worked on db event
is newer and will have its own reminders
+ if (isReminderEvent) {
+ if (eventTimeMillis < dbEventTimestamp.getTime()) {
+ log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - A new event trigger "
+ + "is being worked on, so this older reminder will be
dropped.", flowAction,
+ isReminderEvent ? "reminder" : "original", eventTimeMillis,
dbEventTimestamp);
+ return new NoLongerLeasingStatus();
+ }
+ if (eventTimeMillis > dbEventTimestamp.getTime()) {
+ log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Severe constraint "
+ + "violation encountered: a reminder event newer than db
event was found when db laundering should "
+ + "ensure monotonically increasing laundered event times.",
flowAction,
+ isReminderEvent ? "reminder" : "original", eventTimeMillis,
dbEventTimestamp.getTime());
+ }
+ }
+
+ log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
+ + "with database eventTimestamp {} (in epoch-millis)", flowAction,
isReminderEvent ? "reminder" : "original",
+ eventTimeMillis, dbCurrentTimestamp.getTime());
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
- log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 2:
Same event, lease is valid", flowAction,
- dbCurrentTimestamp.getTime());
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
+ flowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(flowAction,
dbEventTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
- log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 3:
Distinct event, lease is valid", flowAction,
- dbCurrentTimestamp.getTime());
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
+ flowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(flowAction,
dbCurrentTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
- }
+ } // Lease is invalid
else if (leaseValidityStatus == 2) {
- log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4:
Lease is out of date (regardless of whether "
- + "same or distinct event)", flowAction,
dbCurrentTimestamp.getTime());
- if (isWithinEpsilon) {
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
4: Lease is out of date (regardless of "
+ + "whether same or distinct event)", flowAction, isReminderEvent ?
"reminder" : "original",
+ dbCurrentTimestamp.getTime());
+ if (isWithinEpsilon && !isReminderEvent) {
log.warn("Lease should not be out of date for the same trigger event
since epsilon << linger for flowAction"
+ " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp
{}, linger {}", flowAction,
dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
@@ -239,19 +294,19 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
flowAction,
true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp), isReminderEvent);
} // No longer leasing this event
if (isWithinEpsilon) {
- log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5:
Same event, no longer leasing event in db: "
- + "terminate", flowAction, dbCurrentTimestamp.getTime());
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 5: Same event, no longer leasing event"
+ + " in db", flowAction, isReminderEvent ? "reminder" :
"original", dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
- log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6:
Distinct event, no longer leasing event in "
- + "db", flowAction, dbCurrentTimestamp.getTime());
+ log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
6: Distinct event, no longer leasing "
+ + "event in db", flowAction, isReminderEvent ? "reminder" :
"original", dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
int numRowsUpdated =
attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
true, false, dbEventTimestamp, null);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp), isReminderEvent);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -260,8 +315,9 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
/**
* Checks leaseArbiterTable for an existing entry for this flow action and
event time
*/
- protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException {
- return withPreparedStatement(thisTableGetInfoStatement,
+ protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagAction flowAction,
+ boolean isReminderEvent) throws IOException {
+ return withPreparedStatement(isReminderEvent ?
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
getInfoStatement.setString(++i, flowAction.getFlowGroup());
@@ -284,12 +340,12 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
try {
// Extract values from result set
- Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
- Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ Timestamp dbEventTimestamp =
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL);
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL);
boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
int leaseValidityStatus = resultSet.getInt("lease_validity_status");
int dbLinger = resultSet.getInt("linger");
- Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("utc_current_timestamp", UTC_CAL);
return new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
dbLinger, dbCurrentTimestamp);
} catch (SQLException e) {
@@ -337,8 +393,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
return withPreparedStatement(acquireLeaseStatement,
insertStatement -> {
- completeUpdatePreparedStatement(insertStatement, flowAction,
needEventTimeCheck,
- needLeaseAcquisition, dbEventTimestamp,
dbLeaseAcquisitionTimestamp);
+ completeUpdatePreparedStatement(insertStatement, flowAction,
needEventTimeCheck, needLeaseAcquisition,
+ dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return insertStatement.executeUpdate();
}, true);
}
@@ -367,14 +423,15 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
throw new IOException("Expected resultSet containing row information
for the lease that was attempted but "
+ "received nothing.");
}
- if (resultSet.getTimestamp(1) == null) {
+ if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) {
throw new IOException("event_timestamp should never be null (it is
always set to current timestamp)");
}
- long eventTimeMillis = resultSet.getTimestamp(1).getTime();
+ long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp",
UTC_CAL).getTime();
// Lease acquisition timestamp is null if another participant has
completed the lease
- Optional<Long> leaseAcquisitionTimeMillis = resultSet.getTimestamp(2)
== null ? Optional.empty() :
- Optional.of(resultSet.getTimestamp(2).getTime());
- int dbLinger = resultSet.getInt(3);
+ Optional<Long> leaseAcquisitionTimeMillis =
+ resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL)
== null ? Optional.empty() :
+
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp",
UTC_CAL).getTime());
+ int dbLinger = resultSet.getInt("linger");
return new SelectInfoResult(eventTimeMillis,
leaseAcquisitionTimeMillis, dbLinger);
} catch (SQLException e) {
throw new IOException(e);
@@ -397,7 +454,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* @throws IOException
*/
protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int
numRowsUpdated,
- DagActionStore.DagAction flowAction, Optional<Timestamp>
dbCurrentTimestamp)
+ DagActionStore.DagAction flowAction, Optional<Timestamp>
dbCurrentTimestamp, boolean isReminderEvent)
throws SQLException, IOException {
// Fetch values in row after attempted insert
SelectInfoResult selectInfoResult = getRowInfo(flowAction);
@@ -406,13 +463,13 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
return new NoLongerLeasingStatus();
}
if (numRowsUpdated == 1) {
- log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!",
flowAction,
- selectInfoResult.eventTimeMillis);
+ log.debug("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", flowAction,
+ isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction,
selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
- log.debug("Another participant acquired lease in between for [{},
eventTimestamp: {}] - num rows updated: ",
- flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
+ log.debug("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: ",
+ flowAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction,
selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get() +
selectInfoResult.getDbLinger()
@@ -469,10 +526,10 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
statement.setString(++i, flowAction.getFlowActionType().toString());
// Values that may be needed depending on the insert statement
if (needEventTimeCheck) {
- statement.setTimestamp(++i, originalEventTimestamp);
+ statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL);
}
if (needLeaseAcquisitionTimeCheck) {
- statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp);
+ statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL);
}
}
@@ -489,8 +546,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
- updateStatement.setTimestamp(++i, new
Timestamp(status.getEventTimestamp()));
- updateStatement.setTimestamp(++i, new
Timestamp(status.getLeaseAcquisitionTimestamp()));
+ updateStatement.setTimestamp(++i, new
Timestamp(status.getEventTimestamp()), UTC_CAL);
+ updateStatement.setTimestamp(++i, new
Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL);
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{},
eventTimestamp: {}] - FAILED to complete because "
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 5a3b9da15..8f1fdf30c 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -35,8 +35,8 @@ import static
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
- private static final int EPSILON = 30000;
- private static final int LINGER = 80000;
+ private static final int EPSILON = 10000;
+ private static final int LINGER = 50000;
private static final String USER = "testUser";
private static final String PASSWORD = "testPassword";
private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
@@ -49,6 +49,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static DagActionStore.DagAction resumeDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.RESUME);
private static final long eventTimeMillis = System.currentTimeMillis();
+ private static final Timestamp dummyTimestamp = new Timestamp(99999);
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
private String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
TABLE);
@@ -82,7 +83,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(firstLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
@@ -95,7 +96,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
DagActionStore.DagAction killDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction,
eventTimeMillis, false);
Assert.assertTrue(killStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
@@ -106,19 +107,19 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Very little time should have passed if this test directly follows the
one above so this call will be considered
// the same as the previous event
MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(secondLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
- Assert.assertTrue(secondLeasedToAnotherStatus.getEventTimeMillis() ==
firstObtainedStatus.getEventTimestamp());
-
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis()
>= LINGER);
+ Assert.assertEquals(firstObtainedStatus.getEventTimestamp(),
secondLeasedToAnotherStatus.getEventTimeMillis());
+
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis()
> 0);
// Tests CASE 3 of trying to acquire a lease for a distinct flow action
event, while the previous event's lease is
// valid
// Allow enough time to pass for this trigger to be considered distinct,
but not enough time so the lease expires
Thread.sleep(EPSILON * 3/2);
MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(thirdLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
@@ -128,7 +129,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Tests CASE 4 of lease out of date
Thread.sleep(LINGER);
MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(fourthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
@@ -141,14 +142,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimestamp() < EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(fifthLaunchStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be
acquired
Thread.sleep(EPSILON * 3/2);
MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(sixthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
@@ -166,9 +167,9 @@ public class MysqlMultiActiveLeaseArbiterTest {
@Test (dependsOnMethods = "testAcquireLeaseSingleParticipant")
public void testAcquireLeaseIfNewRow() throws IOException {
// Inserting the first time should update 1 row
-
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
1);
+
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
1);
// Inserting the second time should not update any rows
-
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
0);
+
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
0);
}
/*
@@ -180,22 +181,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
@Test (dependsOnMethods = "testAcquireLeaseIfNewRow")
public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement()
throws IOException {
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
- this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
// The following insert will fail since the eventTimestamp does not match
- int numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ int numRowsUpdated =
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
- new Timestamp(99999), new
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+ dummyTimestamp, new
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertEquals(numRowsUpdated, 0);
// The following insert will fail since the leaseAcquisitionTimestamp does
not match
- numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
- new Timestamp(selectInfoResult.getEventTimeMillis()), new
Timestamp(99999));
+ new Timestamp(selectInfoResult.getEventTimeMillis()), dummyTimestamp);
Assert.assertEquals(numRowsUpdated, 0);
// This insert should work since the values match all the columns
- numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
new Timestamp(selectInfoResult.getEventTimeMillis()),
new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
@@ -213,26 +214,92 @@ public class MysqlMultiActiveLeaseArbiterTest {
throws IOException, InterruptedException, SQLException {
// Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
- this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
- boolean markedSuccess =
this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+ mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
- mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction, Optional.empty());
-
- // Sleep enough time for event to be considered distinct
- Thread.sleep(LINGER);
+ mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction,
+ Optional.empty(), false);
// The following insert will fail since eventTimestamp does not match the
expected
- int numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ int numRowsUpdated =
mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
- new Timestamp(99999), null);
+ dummyTimestamp, null);
Assert.assertEquals(numRowsUpdated, 0);
// This insert does match since we utilize the right eventTimestamp
- numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
+ numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
new Timestamp(selectInfoResult.getEventTimeMillis()), null);
Assert.assertEquals(numRowsUpdated, 1);
}
+
+ /*
+ Tests calling `tryAcquireLease` for an older reminder event which should be
immediately returned as `NoLongerLeasing`
+ */
+ @Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFinishedLeasingStatement")
+ public void testOlderReminderEventAcquireLease() throws IOException {
+ // Read database to obtain existing db eventTimeMillis and use it to
construct an older event
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+ mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
+ LeaseAttemptStatus attemptStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
olderEventTimestamp, true);
+ Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+ }
+
+ /*
+ Tests calling `tryAcquireLease` for a reminder event for which a valid lease
exists in the database. We don't expect
+ this case to occur because the reminderEvent should be triggered after the
lease expires, but ensure it's handled
+ correctly anyway.
+ */
+ @Test (dependsOnMethods = "testOlderReminderEventAcquireLease")
+ public void testReminderEventAcquireLeaseOnValidLease() throws IOException {
+ // Read database to obtain existing db eventTimeMillis and re-use it for
the reminder event time
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+ mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ LeaseAttemptStatus attemptStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);
+ Assert.assertTrue(attemptStatus instanceof LeasedToAnotherStatus);
+ LeasedToAnotherStatus leasedToAnotherStatus = (LeasedToAnotherStatus)
attemptStatus;
+ Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(),
selectInfoResult.getEventTimeMillis());
+ }
+
+ /*
+ Tests calling `tryAcquireLease` for a reminder event whose lease has expired
in the database and should successfully
+ acquire a new lease
+ */
+ @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnValidLease")
+ public void testReminderEventAcquireLeaseOnInvalidLease() throws
IOException, InterruptedException {
+ // Read database to obtain existing db eventTimeMillis and wait enough
time for the lease to expire
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+ mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ Thread.sleep(LINGER);
+ LeaseAttemptStatus attemptStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);
+ Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
+ LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
+ Assert.assertTrue(obtainedStatus.getEventTimestamp() >
selectInfoResult.getEventTimeMillis());
+ Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() >
selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
+ }
+
+ /*
+ Tests calling `tryAcquireLease` for a reminder event whose lease has
completed in the database and should return
+ `NoLongerLeasing` status
+ */
+ @Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
+ public void testReminderEventAcquireLeaseOnCompletedLease() throws
IOException {
+ // Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
+ MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
+ mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
+ boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+ resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
+ Assert.assertTrue(markedSuccess);
+
+ // Now have a reminder event check-in on the completed lease
+ LeaseAttemptStatus attemptStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true);
+ Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 1849619f0..4f4bdf9fd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -104,12 +104,14 @@ public class FlowTriggerHandler {
* @param jobProps
* @param flowAction
* @param eventTimeMillis
+ * @param isReminderEvent
* @throws IOException
*/
- public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
- throws IOException {
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis,
+ boolean isReminderEvent) throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
- MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(
+ flowAction, eventTimeMillis, isReminderEvent);
if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.leaseObtainedCount.inc();
@@ -278,6 +280,8 @@ public class FlowTriggerHandler {
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
+ // Use this boolean to indicate whether this is a reminder event
+ prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
String.valueOf(false));
// Update job data map and reset it in jobDetail
jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
return jobDataMap;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index c0975aa97..7afd8bba4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -220,7 +220,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
- public void orchestrate(Spec spec, Properties jobProps, long
triggerTimestampMillis) throws Exception {
+ public void orchestrate(Spec spec, Properties jobProps, long
triggerTimestampMillis, boolean isReminderEvent)
+ throws Exception {
// Add below waiting because TopologyCatalog and FlowCatalog service can
be launched at the same time
this.topologyCatalog.get().getInitComplete().await();
@@ -264,9 +265,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
String flowExecutionId =
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
DagActionStore.DagAction flowAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
- flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis);
- _log.info("Multi-active scheduler finished handling trigger event:
[{}, triggerEventTimestamp: {}]", flowAction,
- triggerTimestampMillis);
+ flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent);
+ _log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
+ flowAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
Dag<JobExecutionPlan> jobExecutionPlanDag =
jobExecutionPlanDagOptional.get();
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index daa96af2f..45053a93d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -503,7 +503,9 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
- this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis));
+ boolean isReminderEvent =
+
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"false"));
+ this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis), isReminderEvent);
} catch (Exception e) {
String exceptionPrefix = "Failed to run Spec: " +
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
log.warn(exceptionPrefix + " because", e);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 2d3f4b3f9..444f2dc8c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -341,13 +341,13 @@ public class OrchestratorTest {
flowProps.put("gobblin.flow.destinationIdentifier", "destination");
flowProps.put("flow.allowConcurrentExecution", false);
FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
+ this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
String metricName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0",
"flow0", ServiceMetricNames.COMPILED);
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
flowProps.setProperty("job.schedule", "0/2 * * * * ?");
FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
+ this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
}
}
\ No newline at end of file