phet commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376559023
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,10 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Mysql Dag Action Store configuration
+ public static final String MYSQL_DAG_ACTION_STORE_PREFIX =
"MysqlDagActionStore.";
+ public static final String
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY =
MYSQL_DAG_ACTION_STORE_PREFIX + ".retentionPeriodSec";
+ public static final long
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 *
60; // (3 days in seconds)
Review Comment:
nit: I'd expect the plural, `SECS_KEY` or `SECONDS_KEY`, rather than
`SEC_KEY`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source
object and executing SQL queries at fixed
+ * intervals.
+ */
+public class MySQLStoreUtils {
+ private final DataSource dataSource;
+ private final Logger log;
+
+ public MySQLStoreUtils(DataSource dataSource, Logger log) {
+ this.dataSource = dataSource;
+ this.log = log;
+ }
+
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ public interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ /** Abstracts recurring pattern around resource management and exception
re-mapping. */
+ public <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+ throws IOException {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(sql)) {
+ T result = f.apply(statement);
+ if (shouldCommit) {
+ connection.commit();
+ }
+ statement.close();
+ return result;
+ } catch (SQLException e) {
+ log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} "
+ + "Exception is {}", ((HikariDataSource)
dataSource).getConnectionTestQuery(), e);
+ throw new IOException(e);
+ }
+ }
+
+ public void runSqlCommandWithInterval(String sqlCommand, long interval,
TimeUnit timeUnit) {
Review Comment:
`s/run/repeat/`?
strongly suggest javadoc to clarify semantics.
* what do you mean by a command--is it strictly DML?
* will the command first run immediately or only after interval completes?
* does this run 'forever' (until app exists)?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -80,13 +77,9 @@
*/
@Slf4j
public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
- /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
- @FunctionalInterface
- protected interface CheckedFunction<T, R> {
- R apply(T t) throws IOException, SQLException;
- }
protected final DataSource dataSource;
+ private final MySQLStoreUtils mySQLStoreUtils;
Review Comment:
pretty unintuitive name for an instance (generally 'utils' suggests a
grab-bag of `static` functionality). solely based on the name, I immediately
wonder: what is this meant to be used for? if it's a cohesive class, it ought
to name itself accordingly.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -38,14 +39,19 @@
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.MySQLStoreUtils;
+
@Slf4j
public class MysqlDagActionStore implements DagActionStore {
Review Comment:
is `withPreparedStatement` useful here too?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -218,52 +213,28 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
initializeConstantsTable();
- runRetentionOnArbitrationTable();
+
+ // Periodically deletes all rows in the table with event_timestamp older
than the retention period defined by config.
+ mySQLStoreUtils.runSqlCommandWithInterval(thisTableRetentionStatement, 4,
TimeUnit.HOURS);
+
log.info("MysqlMultiActiveLeaseArbiter initialized");
}
// Initialize Constants table if needed and insert row into it if one does
not exist
private void initializeConstantsTable() throws IOException {
String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
- withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+ mySQLStoreUtils.withPreparedStatement(createConstantsStatement,
createStatement -> createStatement.executeUpdate(),
Review Comment:
seeing this use--which is quite reasonable IMO--makes me wonder whether this
ought to be a common base class. what are the args for vs. against that
approach? are you concerned about multiple inheritance, since some mysql store
classes need another different base class?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -58,6 +64,8 @@ public class MysqlDagActionStore implements DagActionStore {
+ "flow_execution_id varchar(" +
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
+ "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT
CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
+ // Deletes rows older than retention time period (in seconds) to prevent
this table from growing unbounded.
+ private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND)";
Review Comment:
the other class had:
```
DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)
```
dunno which is preferable... but can we unify?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source
object and executing SQL queries at fixed
+ * intervals.
+ */
+public class MySQLStoreUtils {
+ private final DataSource dataSource;
Review Comment:
javadoc ought to delineate whose responsibility for resource mgmt of
`DataSource`--this class or caller/user
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]