This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7ae7c0bf1 [GOBBLIN-1923] Add retention for lease arbiter table (#3792)
7ae7c0bf1 is described below
commit 7ae7c0bf149061ad7a762f21355091c9dc1aae64
Author: umustafi <[email protected]>
AuthorDate: Fri Oct 6 13:22:46 2023 -0700
[GOBBLIN-1923] Add retention for lease arbiter table (#3792)
* Add retention for lease arbiter table
* Replace blocking thread with scheduled thread pool executor
* Make Calendar instance thread-safe
* Rename variables, make values more clear
* Update timestamp related cols
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 4 +-
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 92 ++++++++++++++++------
.../dag_action_store/MysqlDagActionStore.java | 1 -
3 files changed, 70 insertions(+), 27 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 81608655a..6d36d9ff3 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -101,6 +101,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
+ public static final String
SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
+ public static final long
DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 *
60 * 60 * 1000; // (3 days in ms)
// Refers to the event we originally tried to acquire a lease which achieved
`consensus` among participants through
// the database
public static final String
SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY =
"preservedConsensusEventTimeMillis";
@@ -116,7 +118,7 @@ public class ConfigurationKeys {
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
- public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;
+ public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;
// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY =
"jobexecutor.threadpool.size";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index a7c035185..4b4b9c837 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -30,6 +30,8 @@ import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -85,15 +87,16 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
protected final DataSource dataSource;
private final String leaseArbiterTableName;
private final String constantsTableName;
- private final int epsilon;
- private final int linger;
+ private final int epsilonMillis;
+ private final int lingerMillis;
+ private final long retentionPeriodMillis;
+ private String thisTableRetentionStatement;
private String thisTableGetInfoStatement;
private String thisTableGetInfoStatementForReminder;
private String thisTableSelectAfterInsertStatement;
private String thisTableAcquireLeaseIfMatchingAllStatement;
private String thisTableAcquireLeaseIfFinishedStatement;
- // TODO: define retention on this table
/*
Notes:
- Set `event_timestamp` default value to turn off timestamp auto-updates
for row modifications which alters this col
@@ -110,9 +113,13 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "
flow_action varchar(100) NOT NULL, "
- + "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), "
- + "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, "
+ + "event_timestamp TIMESTAMP(3) NOT NULL, "
+ + "lease_acquisition_timestamp TIMESTAMP(3) NULL, "
+ "PRIMARY KEY (flow_group,flow_name,flow_action))";
+ // Deletes rows older than retention time period regardless of lease status
as they should all be invalid or completed
+ // since retention >> linger
+ private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT =
"DELETE FROM %s WHERE event_timestamp < "
+ + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY
(primary_key))";
// Only insert epsilon and linger values from config if this table does not
contain a pre-existing values already.
@@ -137,7 +144,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, "
- + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as
utc_current_timestamp FROM %s, %s "
+ + WHERE_CLAUSE_TO_MATCH_KEY;
// Same as query above, except that isWithinEpsilon is True if the reminder
event timestamp (provided by caller) is
// OLDER than or equal to the db event_timestamp and within epsilon away
from it.
protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER =
"SELECT "
@@ -147,7 +155,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, "
- + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as
utc_current_timestamp FROM %s, %s "
+ + WHERE_CLAUSE_TO_MATCH_KEY;
// Insert or update row to acquire lease if values have not changed since
the previous read
// Need to define three separate statements to handle cases where row does
not exist or has null values to check
protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT
INTO %s (flow_group, flow_name, "
@@ -162,8 +171,9 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
+ WHERE_CLAUSE_TO_MATCH_ROW;
// Complete lease acquisition if values have not changed since lease was
acquired
protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
- + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
- protected static final Calendar UTC_CAL =
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ + "event_timestamp=event_timestamp, lease_acquisition_timestamp = NULL "
+ WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final ThreadLocal<Calendar> UTC_CAL =
+ ThreadLocal.withInitial(() ->
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
@Inject
public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
@@ -178,10 +188,13 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
this.constantsTableName = ConfigUtils.getString(config,
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
- this.epsilon = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+ this.epsilonMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
- this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+ this.lingerMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+ this.retentionPeriodMillis = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
+ this.thisTableRetentionStatement =
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT,
this.leaseArbiterTableName);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT,
this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableGetInfoStatementForReminder =
String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
@@ -203,7 +216,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
initializeConstantsTable();
-
+ runRetentionOnArbitrationTable();
log.info("MysqlMultiActiveLeaseArbiter initialized");
}
@@ -215,12 +228,41 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
String insertConstantsStatement =
String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(insertConstantsStatement, insertStatement -> {
int i = 0;
- insertStatement.setInt(++i, epsilon);
- insertStatement.setInt(++i, linger);
+ insertStatement.setInt(++i, epsilonMillis);
+ insertStatement.setInt(++i, lingerMillis);
return insertStatement.executeUpdate();
}, true);
}
+ /**
+ * Periodically deletes all rows in the table with event_timestamp older
than the retention period defined by config.
+ * // TODO: create a utility to run a SQL commend in a STPE using interval T
+ */
+ private void runRetentionOnArbitrationTable() {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ Runnable retentionTask = () -> {
+ try {
+ Thread.sleep(10000);
+ withPreparedStatement(thisTableRetentionStatement,
+ retentionStatement -> {
+ retentionStatement.setLong(1, retentionPeriodMillis);
+ int numRowsDeleted = retentionStatement.executeUpdate();
+ if (numRowsDeleted != 0) {
+ log.info("Multi-active lease arbiter retention thread deleted
{} rows from the lease arbiter table",
+ numRowsDeleted);
+ }
+ return numRowsDeleted;
+ }, true);
+ } catch (InterruptedException | IOException e) {
+ log.error("Failing to run retention on lease arbiter table. Unbounded
growth can lead to database slowness and "
+ + "affect our system performance. Examine exception: ", e);
+ }
+ };
+
+ // Run retention thread every 4 hours (6 times a day)
+ executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS);
+ }
+
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
@@ -340,12 +382,12 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
try {
// Extract values from result set
- Timestamp dbEventTimestamp =
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL);
- Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL);
+ Timestamp dbEventTimestamp =
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get());
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get());
boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
int leaseValidityStatus = resultSet.getInt("lease_validity_status");
int dbLinger = resultSet.getInt("linger");
- Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("utc_current_timestamp", UTC_CAL);
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("utc_current_timestamp", UTC_CAL.get());
return new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
dbLinger, dbCurrentTimestamp);
} catch (SQLException e) {
@@ -423,14 +465,14 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
throw new IOException("Expected resultSet containing row information
for the lease that was attempted but "
+ "received nothing.");
}
- if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) {
+ if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()) ==
null) {
throw new IOException("event_timestamp should never be null (it is
always set to current timestamp)");
}
- long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp",
UTC_CAL).getTime();
+ long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp",
UTC_CAL.get()).getTime();
// Lease acquisition timestamp is null if another participant has
completed the lease
Optional<Long> leaseAcquisitionTimeMillis =
- resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL)
== null ? Optional.empty() :
-
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp",
UTC_CAL).getTime());
+ resultSet.getTimestamp("utc_lease_acquisition_timestamp",
UTC_CAL.get()) == null ? Optional.empty() :
+
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp",
UTC_CAL.get()).getTime());
int dbLinger = resultSet.getInt("linger");
return new SelectInfoResult(eventTimeMillis,
leaseAcquisitionTimeMillis, dbLinger);
} catch (SQLException e) {
@@ -526,10 +568,10 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
statement.setString(++i, flowAction.getFlowActionType().toString());
// Values that may be needed depending on the insert statement
if (needEventTimeCheck) {
- statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL);
+ statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL.get());
}
if (needLeaseAcquisitionTimeCheck) {
- statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL);
+ statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp,
UTC_CAL.get());
}
}
@@ -546,8 +588,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
- updateStatement.setTimestamp(++i, new
Timestamp(status.getEventTimestamp()), UTC_CAL);
- updateStatement.setTimestamp(++i, new
Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL);
+ updateStatement.setTimestamp(++i, new
Timestamp(status.getEventTimestamp()), UTC_CAL.get());
+ updateStatement.setTimestamp(++i, new
Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{},
eventTimestamp: {}] - FAILED to complete because "
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index ab5faee8c..4f639e04a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -173,7 +173,6 @@ public class MysqlDagActionStore implements DagActionStore {
rs.close();
}
}
-
}
@Override