[ 
https://issues.apache.org/jira/browse/GOBBLIN-1942?focusedWorklogId=887905&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887905
 ]

ASF GitHub Bot logged work on GOBBLIN-1942:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Oct/23 17:35
            Start Date: 30/Oct/23 17:35
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376559023


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,10 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Mysql Dag Action Store configuration
+  public static final String MYSQL_DAG_ACTION_STORE_PREFIX = 
"MysqlDagActionStore.";
+  public static final String 
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 
MYSQL_DAG_ACTION_STORE_PREFIX + ".retentionPeriodSec";
+  public static final long 
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 
60; // (3 days in seconds)

Review Comment:
   nit: I'd expect the plural, `SECS_KEY` or `SECONDS_KEY`, rather than 
`SEC_KEY`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can 
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source 
object and executing SQL queries at fixed
+ * intervals. 
+ */
+public class MySQLStoreUtils {
+  private final DataSource dataSource;
+  private final Logger log;
+
+  public MySQLStoreUtils(DataSource dataSource, Logger log) {
+    this.dataSource = dataSource;
+    this.log = log;
+  }
+
+  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  public interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
+  public <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+      throws IOException {
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(sql)) {
+      T result = f.apply(statement);
+      if (shouldCommit) {
+        connection.commit();
+      }
+      statement.close();
+      return result;
+    } catch (SQLException e) {
+      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} "
+          + "Exception is {}", ((HikariDataSource) 
dataSource).getConnectionTestQuery(), e);
+      throw new IOException(e);
+    }
+  }
+
+  public void runSqlCommandWithInterval(String sqlCommand, long interval, 
TimeUnit timeUnit) {

Review Comment:
   `s/run/repeat/`?
   
   strongly suggest javadoc to clarify semantics.
     * what do you mean by a command--is it strictly DML? 
     * will the command first run immediately or only after interval completes?
     * does this run 'forever' (until app exists)?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -80,13 +77,9 @@
  */
 @Slf4j
 public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
-  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
-  @FunctionalInterface
-  protected interface CheckedFunction<T, R> {
-    R apply(T t) throws IOException, SQLException;
-  }
 
   protected final DataSource dataSource;
+  private final MySQLStoreUtils mySQLStoreUtils;

Review Comment:
   pretty unintuitive name for an instance (generally 'utils' suggests a 
grab-bag of `static` functionality).  solely based on the name, I immediately 
wonder: what is this meant to be used for?  if it's a cohesive class, it ought 
to name itself accordingly.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -38,14 +39,19 @@
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.MySQLStoreUtils;
+
 
 @Slf4j
 public class MysqlDagActionStore implements DagActionStore {

Review Comment:
   is `withPreparedStatement` useful here too?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -218,52 +213,28 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
     initializeConstantsTable();
-    runRetentionOnArbitrationTable();
+
+    // Periodically deletes all rows in the table with event_timestamp older 
than the retention period defined by config.
+    mySQLStoreUtils.runSqlCommandWithInterval(thisTableRetentionStatement, 4, 
TimeUnit.HOURS);
+
     log.info("MysqlMultiActiveLeaseArbiter initialized");
   }
 
   // Initialize Constants table if needed and insert row into it if one does 
not exist
   private void initializeConstantsTable() throws IOException {
     String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
-    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+    mySQLStoreUtils.withPreparedStatement(createConstantsStatement, 
createStatement -> createStatement.executeUpdate(),

Review Comment:
   seeing this use--which is quite reasonable IMO--makes me wonder whether this 
ought to be a common base class.  what are the args for vs. against that 
approach?  are you concerned about multiple inheritance, since some mysql store 
classes need another different base class?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -58,6 +64,8 @@ public class MysqlDagActionStore implements DagActionStore {
       + "flow_execution_id varchar(" + 
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
       + "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP  on update CURRENT_TIMESTAMP NOT NULL, "
       + "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
+  // Deletes rows older than retention time period (in seconds) to prevent 
this table from growing unbounded.
+  private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE 
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND)";

Review Comment:
   the other class had:
   ```
   DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)
   ```
   dunno which is preferable... but can we unify?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can 
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source 
object and executing SQL queries at fixed
+ * intervals. 
+ */
+public class MySQLStoreUtils {
+  private final DataSource dataSource;

Review Comment:
   javadoc ought to delineate whose responsibility for resource mgmt of 
`DataSource`--this class or caller/user





Issue Time Tracking
-------------------

    Worklog Id:     (was: 887905)
    Time Spent: 20m  (was: 10m)

> Create MySQL util class for re-usable methods & enable MysqlDagActionStore 
> retention
> ------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1942
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1942
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Defines a new class {{MySQLStoreUtils}} used for common functionality between 
> MySQL based implementations of stores. It includes a new method to run a SQL 
> command in a {{ScheduledThreadPoolExecutor}} using {{interval T}} which is 
> used for retention on the {{MysqlDagActionStore}} and 
> {{{}MysqlMultiActiveLeaseArbiter{}}}.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to