[
https://issues.apache.org/jira/browse/GOBBLIN-1942?focusedWorklogId=887905&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887905
]
ASF GitHub Bot logged work on GOBBLIN-1942:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Oct/23 17:35
Start Date: 30/Oct/23 17:35
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376559023
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,10 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Mysql Dag Action Store configuration
+ public static final String MYSQL_DAG_ACTION_STORE_PREFIX =
"MysqlDagActionStore.";
+ public static final String
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY =
MYSQL_DAG_ACTION_STORE_PREFIX + ".retentionPeriodSec";
+ public static final long
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 *
60; // (3 days in seconds)
Review Comment:
nit: I'd expect the plural, `SECS_KEY` or `SECONDS_KEY`, rather than
`SEC_KEY`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source
object and executing SQL queries at fixed
+ * intervals.
+ */
+public class MySQLStoreUtils {
+ private final DataSource dataSource;
+ private final Logger log;
+
+ public MySQLStoreUtils(DataSource dataSource, Logger log) {
+ this.dataSource = dataSource;
+ this.log = log;
+ }
+
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ public interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ /** Abstracts recurring pattern around resource management and exception
re-mapping. */
+ public <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+ throws IOException {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(sql)) {
+ T result = f.apply(statement);
+ if (shouldCommit) {
+ connection.commit();
+ }
+ statement.close();
+ return result;
+ } catch (SQLException e) {
+ log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} "
+ + "Exception is {}", ((HikariDataSource)
dataSource).getConnectionTestQuery(), e);
+ throw new IOException(e);
+ }
+ }
+
+ public void runSqlCommandWithInterval(String sqlCommand, long interval,
TimeUnit timeUnit) {
Review Comment:
`s/run/repeat/`?
strongly suggest javadoc to clarify semantics.
* what do you mean by a command--is it strictly DML?
* will the command first run immediately or only after interval completes?
* does this run 'forever' (until app exists)?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -80,13 +77,9 @@
*/
@Slf4j
public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
- /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
- @FunctionalInterface
- protected interface CheckedFunction<T, R> {
- R apply(T t) throws IOException, SQLException;
- }
protected final DataSource dataSource;
+ private final MySQLStoreUtils mySQLStoreUtils;
Review Comment:
pretty unintuitive name for an instance (generally 'utils' suggests a
grab-bag of `static` functionality). solely based on the name, I immediately
wonder: what is this meant to be used for? if it's a cohesive class, it ought
to name itself accordingly.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -38,14 +39,19 @@
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.MySQLStoreUtils;
+
@Slf4j
public class MysqlDagActionStore implements DagActionStore {
Review Comment:
is `withPreparedStatement` useful here too?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -218,52 +213,28 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
initializeConstantsTable();
- runRetentionOnArbitrationTable();
+
+ // Periodically deletes all rows in the table with event_timestamp older
than the retention period defined by config.
+ mySQLStoreUtils.runSqlCommandWithInterval(thisTableRetentionStatement, 4,
TimeUnit.HOURS);
+
log.info("MysqlMultiActiveLeaseArbiter initialized");
}
// Initialize Constants table if needed and insert row into it if one does
not exist
private void initializeConstantsTable() throws IOException {
String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
- withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+ mySQLStoreUtils.withPreparedStatement(createConstantsStatement,
createStatement -> createStatement.executeUpdate(),
Review Comment:
seeing this use--which is quite reasonable IMO--makes me wonder whether this
ought to be a common base class. what are the args for vs. against that
approach? are you concerned about multiple inheritance, since some mysql store
classes need another different base class?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -58,6 +64,8 @@ public class MysqlDagActionStore implements DagActionStore {
+ "flow_execution_id varchar(" +
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
+ "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT
CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
+ // Deletes rows older than retention time period (in seconds) to prevent
this table from growing unbounded.
+ private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND)";
Review Comment:
the other class had:
```
DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)
```
dunno which is preferable... but can we unify?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source
object and executing SQL queries at fixed
+ * intervals.
+ */
+public class MySQLStoreUtils {
+ private final DataSource dataSource;
Review Comment:
javadoc ought to delineate whose responsibility for resource mgmt of
`DataSource`--this class or caller/user
Issue Time Tracking
-------------------
Worklog Id: (was: 887905)
Time Spent: 20m (was: 10m)
> Create MySQL util class for re-usable methods & enable MysqlDagActionStore
> retention
> ------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1942
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1942
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Defines a new class {{MySQLStoreUtils}} used for common functionality between
> MySQL based implementations of stores. It includes a new method to run a SQL
> command in a {{ScheduledThreadPoolExecutor}} using {{interval T}} which is
> used for retention on the {{MysqlDagActionStore}} and
> {{{}MysqlMultiActiveLeaseArbiter{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)