phet commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376559023


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,10 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Mysql Dag Action Store configuration
+  public static final String MYSQL_DAG_ACTION_STORE_PREFIX = 
"MysqlDagActionStore.";
+  public static final String 
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 
MYSQL_DAG_ACTION_STORE_PREFIX + ".retentionPeriodSec";
+  public static final long 
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 
60; // (3 days in seconds)

Review Comment:
   nit: I'd expect the plural, `SECS_KEY` or `SECONDS_KEY`, rather than 
`SEC_KEY`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can 
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source 
object and executing SQL queries at fixed
+ * intervals. 
+ */
+public class MySQLStoreUtils {
+  private final DataSource dataSource;
+  private final Logger log;
+
+  public MySQLStoreUtils(DataSource dataSource, Logger log) {
+    this.dataSource = dataSource;
+    this.log = log;
+  }
+
+  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  public interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
+  public <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+      throws IOException {
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(sql)) {
+      T result = f.apply(statement);
+      if (shouldCommit) {
+        connection.commit();
+      }
+      statement.close();
+      return result;
+    } catch (SQLException e) {
+      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} "
+          + "Exception is {}", ((HikariDataSource) 
dataSource).getConnectionTestQuery(), e);
+      throw new IOException(e);
+    }
+  }
+
+  public void runSqlCommandWithInterval(String sqlCommand, long interval, 
TimeUnit timeUnit) {

Review Comment:
   `s/run/repeat/`?
   
   strongly suggest javadoc to clarify semantics.
     * what do you mean by a command--is it strictly DML? 
     * will the command first run immediately or only after interval completes?
     * does this run 'forever' (until app exists)?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -80,13 +77,9 @@
  */
 @Slf4j
 public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
-  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
-  @FunctionalInterface
-  protected interface CheckedFunction<T, R> {
-    R apply(T t) throws IOException, SQLException;
-  }
 
   protected final DataSource dataSource;
+  private final MySQLStoreUtils mySQLStoreUtils;

Review Comment:
   pretty unintuitive name for an instance (generally 'utils' suggests a 
grab-bag of `static` functionality).  solely based on the name, I immediately 
wonder: what is this meant to be used for?  if it's a cohesive class, it ought 
to name itself accordingly.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -38,14 +39,19 @@
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.MySQLStoreUtils;
+
 
 @Slf4j
 public class MysqlDagActionStore implements DagActionStore {

Review Comment:
   is `withPreparedStatement` useful here too?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -218,52 +213,28 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
     initializeConstantsTable();
-    runRetentionOnArbitrationTable();
+
+    // Periodically deletes all rows in the table with event_timestamp older 
than the retention period defined by config.
+    mySQLStoreUtils.runSqlCommandWithInterval(thisTableRetentionStatement, 4, 
TimeUnit.HOURS);
+
     log.info("MysqlMultiActiveLeaseArbiter initialized");
   }
 
   // Initialize Constants table if needed and insert row into it if one does 
not exist
   private void initializeConstantsTable() throws IOException {
     String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
-    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+    mySQLStoreUtils.withPreparedStatement(createConstantsStatement, 
createStatement -> createStatement.executeUpdate(),

Review Comment:
   seeing this use--which is quite reasonable IMO--makes me wonder whether this 
ought to be a common base class.  what are the args for vs. against that 
approach?  are you concerned about multiple inheritance, since some mysql store 
classes need another different base class?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -58,6 +64,8 @@ public class MysqlDagActionStore implements DagActionStore {
       + "flow_execution_id varchar(" + 
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
       + "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP  on update CURRENT_TIMESTAMP NOT NULL, "
       + "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
+  // Deletes rows older than retention time period (in seconds) to prevent 
this table from growing unbounded.
+  private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE 
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND)";

Review Comment:
   the other class had:
   ```
   DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)
   ```
   dunno which is preferable... but can we unify?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can 
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source 
object and executing SQL queries at fixed
+ * intervals. 
+ */
+public class MySQLStoreUtils {
+  private final DataSource dataSource;

Review Comment:
   javadoc ought to delineate whose responsibility for resource mgmt of 
`DataSource`--this class or caller/user



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to