This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c865b6a8e [GOBBLIN-1942] Create MySQL util class for re-usable methods 
and setup MysqlDagActio… (#3812)
c865b6a8e is described below

commit c865b6a8e4628f9602671151a1a33c8a27dacdef
Author: umustafi <[email protected]>
AuthorDate: Wed Nov 1 12:15:24 2023 -0700

    [GOBBLIN-1942] Create MySQL util class for re-usable methods and setup 
MysqlDagActio… (#3812)
    
    * Create MySQL util class for re-usable methods and setup 
MysqlDagActionStore retention
    
    * Add a java doc
    
    * Address review comments
    
    * Close scheduled executors on shutdown & clarify naming and comments
    
    * Remove extra period making config key invalid
    
    * implement Closeable
    
    * Use try with resources
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |   4 +
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  |  84 ++++------------
 .../dag_action_store/MysqlDagActionStore.java      | 107 ++++++++++----------
 .../runtime/spec_store/MysqlBaseSpecStore.java     |   1 +
 .../apache/gobblin/util/DBStatementExecutor.java   | 111 +++++++++++++++++++++
 5 files changed, 188 insertions(+), 119 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a50ba8c75..5fe8f001a 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -95,6 +95,10 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Mysql Dag Action Store configuration
+  public static final String MYSQL_DAG_ACTION_STORE_PREFIX = 
"MysqlDagActionStore.";
+  public static final String 
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = 
MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
+  public static final long 
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 
60; // (3 days in seconds)
   // Scheduler lease determination store configuration
   public static final String MYSQL_LEASE_ARBITER_PREFIX = 
"MysqlMultiActiveLeaseArbiter";
   public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 338e908a2..05449767c 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.runtime.api;
 
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
-import com.zaxxer.hikari.HikariDataSource;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -30,7 +29,6 @@ import java.sql.Timestamp;
 import java.util.Calendar;
 import java.util.Optional;
 import java.util.TimeZone;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.sql.DataSource;
 import lombok.Data;
@@ -40,8 +38,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
-
-import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
+import org.apache.gobblin.util.DBStatementExecutor;
 
 
 /**
@@ -80,13 +77,9 @@ import static 
org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
  */
 @Slf4j
 public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
-  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
-  @FunctionalInterface
-  protected interface CheckedFunction<T, R> {
-    R apply(T t) throws IOException, SQLException;
-  }
 
   protected final DataSource dataSource;
+  private final DBStatementExecutor dbStatementExecutor;
   private final String leaseArbiterTableName;
   private final String constantsTableName;
   private final int epsilonMillis;
@@ -121,7 +114,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   // Deletes rows older than retention time period regardless of lease status 
as they should all be invalid or completed
   // since retention >> linger
   private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = 
"DELETE FROM %s WHERE event_timestamp < "
-      + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
+      + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)";
   private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
       + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY 
(primary_key))";
   // Only insert epsilon and linger values from config if this table does not 
contain a pre-existing values already.
@@ -196,7 +189,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
     this.retentionPeriodMillis = ConfigUtils.getLong(config, 
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
         
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
-    this.thisTableRetentionStatement = 
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, 
this.leaseArbiterTableName);
+    this.thisTableRetentionStatement = 
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, 
this.leaseArbiterTableName,
+        retentionPeriodMillis);
     this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, 
this.leaseArbiterTableName,
         this.constantsTableName);
     this.thisTableGetInfoStatementForReminder = 
String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
@@ -208,6 +202,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     this.thisTableAcquireLeaseIfFinishedStatement =
         
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
     String createArbiterStatement = String.format(
         CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
     try (Connection connection = dataSource.getConnection();
@@ -218,17 +213,21 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
     initializeConstantsTable();
-    runRetentionOnArbitrationTable();
+
+    // Periodically deletes all rows in the table with event_timestamp older 
than the retention period defined by config.
+    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 4, TimeUnit.HOURS);
+
     log.info("MysqlMultiActiveLeaseArbiter initialized");
   }
 
   // Initialize Constants table if needed and insert row into it if one does 
not exist
   private void initializeConstantsTable() throws IOException {
     String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
-    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+    dbStatementExecutor.withPreparedStatement(createConstantsStatement, 
createStatement -> createStatement.executeUpdate(),
+        true);
 
     String insertConstantsStatement = 
String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
-    withPreparedStatement(insertConstantsStatement, insertStatement -> {
+    dbStatementExecutor.withPreparedStatement(insertConstantsStatement, 
insertStatement -> {
       int i = 0;
       insertStatement.setInt(++i, epsilonMillis);
       insertStatement.setInt(++i, lingerMillis);
@@ -236,34 +235,6 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     }, true);
   }
 
-  /**
-   * Periodically deletes all rows in the table with event_timestamp older 
than the retention period defined by config.
-   * // TODO: create a utility to run a SQL commend in a STPE using interval T
-   */
-  private void runRetentionOnArbitrationTable() {
-    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-    Runnable retentionTask = () -> {
-      try {
-        withPreparedStatement(thisTableRetentionStatement,
-            retentionStatement -> {
-              retentionStatement.setLong(1, retentionPeriodMillis);
-              int numRowsDeleted = retentionStatement.executeUpdate();
-              if (numRowsDeleted != 0) {
-                log.info("Multi-active lease arbiter retention thread deleted 
{} rows from the lease arbiter table",
-                    numRowsDeleted);
-              }
-              return numRowsDeleted;
-            }, true);
-      } catch (IOException e) {
-        log.error("Failing to run retention on lease arbiter table. Unbounded 
growth can lead to database slowness and "
-            + "affect our system performance. Examine exception: ", e);
-      }
-    };
-
-    // Run retention thread every 4 hours (6 times a day)
-    executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS);
-  }
-
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis,
       boolean isReminderEvent) throws IOException {
@@ -370,7 +341,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    */
   protected Optional<GetEventInfoResult> 
getExistingEventInfo(DagActionStore.DagAction flowAction,
       boolean isReminderEvent, long eventTimeMillis) throws IOException {
-    return withPreparedStatement(isReminderEvent ? 
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
+    return dbStatementExecutor.withPreparedStatement(isReminderEvent ? 
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
         getInfoStatement -> {
           int i = 0;
           if (isReminderEvent) {
@@ -425,7 +396,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) 
throws IOException {
     String formattedAcquireLeaseNewRowStatement =
         String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
-    return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
+    return 
dbStatementExecutor.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
         insertStatement -> {
           completeInsertPreparedStatement(insertStatement, flowAction);
           try {
@@ -447,7 +418,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, 
DagActionStore.DagAction flowAction,
       boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp 
dbEventTimestamp,
       Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
-    return withPreparedStatement(acquireLeaseStatement,
+    return dbStatementExecutor.withPreparedStatement(acquireLeaseStatement,
         insertStatement -> {
           completeUpdatePreparedStatement(insertStatement, flowAction, 
needEventTimeCheck, needLeaseAcquisition,
               dbEventTimestamp, dbLeaseAcquisitionTimestamp);
@@ -460,7 +431,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * was successful or not.
    */
   protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) 
throws IOException {
-    return withPreparedStatement(thisTableSelectAfterInsertStatement,
+    return 
dbStatementExecutor.withPreparedStatement(thisTableSelectAfterInsertStatement,
         selectStatement -> {
           completeWhereClauseMatchingKeyPreparedStatement(selectStatement, 
flowAction);
           ResultSet resultSet = selectStatement.executeQuery();
@@ -596,7 +567,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     String flowGroup = flowAction.getFlowGroup();
     String flowName = flowAction.getFlowName();
     DagActionStore.FlowActionType flowActionType = 
flowAction.getFlowActionType();
-    return 
withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, 
leaseArbiterTableName),
+    return 
dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
 leaseArbiterTableName),
         updateStatement -> {
           int i = 0;
           updateStatement.setString(++i, flowGroup);
@@ -621,25 +592,6 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         }, true);
   }
 
-  /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
-  protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
-      throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(sql)) {
-      T result = f.apply(statement);
-      if (shouldCommit) {
-        connection.commit();
-      }
-      statement.close();
-      return result;
-    } catch (SQLException e) {
-      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} "
-          + "Exception is {}", ((HikariDataSource) 
this.dataSource).getConnectionTestQuery(), e);
-      throw new IOException(e);
-    }
-  }
-
-
   /**
    * DTO for arbiter's current lease state for a FlowActionEvent
   */
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index 4f639e04a..894d0a300 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
 
+import java.util.concurrent.TimeUnit;
 import javax.sql.DataSource;
 
 import lombok.extern.slf4j.Slf4j;
@@ -38,6 +39,8 @@ import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.DBStatementExecutor;
+
 
 @Slf4j
 public class MysqlDagActionStore implements DagActionStore {
@@ -45,7 +48,10 @@ public class MysqlDagActionStore implements DagActionStore {
   public static final String CONFIG_PREFIX = "MysqlDagActionStore";
 
   protected final DataSource dataSource;
+  private final DBStatementExecutor dbStatementExecutor;
   private final String tableName;
+  private final long retentionPeriodSeconds;
+  private String thisTableRetentionStatement;
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND 
dag_action = ?)";
 
   protected static final String INSERT_STATEMENT = "INSERT INTO %s 
(flow_group, flow_name, flow_execution_id, dag_action) "
@@ -58,6 +64,8 @@ public class MysqlDagActionStore implements DagActionStore {
       + "flow_execution_id varchar(" + 
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
       + "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP  on update CURRENT_TIMESTAMP NOT NULL, "
       + "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
+  // Deletes rows older than retention time period (in seconds) to prevent 
this table from growing unbounded.
+  private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE 
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %s SECOND)";
 
   private final int getDagActionMaxRetries;
 
@@ -71,7 +79,8 @@ public class MysqlDagActionStore implements DagActionStore {
     this.tableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
         ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
     this.getDagActionMaxRetries = ConfigUtils.getInt(config, 
ConfigurationKeys.MYSQL_GET_MAX_RETRIES, 
ConfigurationKeys.DEFAULT_MYSQL_GET_MAX_RETRIES);
-
+    this.retentionPeriodSeconds = ConfigUtils.getLong(config, 
ConfigurationKeys.MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY,
+        
ConfigurationKeys.DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY);
     this.dataSource = MysqlDataSourceFactory.get(config,
         SharedResourcesBrokerFactory.getImplicitBroker());
     try (Connection connection = dataSource.getConnection();
@@ -81,116 +90,108 @@ public class MysqlDagActionStore implements 
DagActionStore {
     } catch (SQLException e) {
       throw new IOException("Failure creation table " + tableName, e);
     }
+    this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
+    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
   }
 
   @Override
   public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
       existStatement.setString(++i, flowActionType.toString());
-      rs = existStatement.executeQuery();
-      rs.next();
-      return rs.getBoolean(1);
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
-          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
+      ResultSet rs = null;
+      try {
+        rs = existStatement.executeQuery();
+        rs.next();
+        return rs.getBoolean(1);
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
       }
-    }
+    }, true);
   }
 
   @Override
   public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+    dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
       insertStatement.setString(++i, flowActionType.toString());
-      insertStatement.executeUpdate();
-      connection.commit();
+      return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    }
+    }}, true);
   }
 
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
+    try {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
       deleteStatement.setString(++i, dagAction.getFlowExecutionId());
       deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
-      connection.commit();
       return result != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
-    }
+    }}, true);
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
   private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getStatement = 
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
       int i = 0;
       getStatement.setString(++i, flowGroup);
       getStatement.setString(++i, flowName);
       getStatement.setString(++i, flowExecutionId);
       getStatement.setString(++i, flowActionType.toString());
-      rs = getStatement.executeQuery();
-      if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
-      } else {
-        if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+      try (ResultSet rs = getStatement.executeQuery()) {
+        if (rs.next()) {
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
+        } else if (exponentialBackoff.awaitNextRetryIfAvailable()) {
           return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
         } else {
           log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
               flowActionType, flowGroup, flowName, flowExecutionId));
           return null;
         }
+      } catch (SQLException | InterruptedException e) {
+        throw new IOException(String.format("Failure get %s from table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
       }
-    } catch (SQLException | InterruptedException e) {
-      throw new IOException(String.format("Failure get %s from table %s", new 
DagAction(flowGroup, flowName, flowExecutionId,
-          flowActionType), tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
-      }
-    }
+    }, true);
   }
 
   @Override
   public Collection<DagAction> getDagActions() throws IOException {
-    HashSet<DagAction> result = new HashSet<>();
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getAllStatement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
-        ResultSet rs = getAllStatement.executeQuery()) {
-      while (rs.next()) {
-        result.add(
-            new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), 
FlowActionType.valueOf(rs.getString(4))));
-      }
-      if (rs != null) {
-        rs.close();
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT, 
tableName), getAllStatement -> {
+      HashSet<DagAction> result = new HashSet<>();
+      try (ResultSet rs = getAllStatement.executeQuery()) {
+        while (rs.next()) {
+          result.add(new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
+        }
+        return result;
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure get dag actions from 
table %s ", tableName), e);
       }
-      return result;
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure get dag actions from table 
%s ", tableName), e);
-    }
+    }, true);
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index 713c2b9d8..595ba89d7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -352,6 +352,7 @@ public class MysqlBaseSpecStore extends 
InstrumentedSpecStore {
     return Optional.of(this.specStoreURI);
   }
 
+  // TODO: migrate this class to use common util {@link DBStatementExecutor}
   /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
   protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws 
IOException {
     try (Connection connection = this.dataSource.getConnection();
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java
new file mode 100644
index 000000000..1f554779c
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * Many database stores require common functionality that can be stored in a 
utility class. The functionality
+ * includes executing prepared statements on a data source object and SQL 
queries at fixed intervals.
+ * The caller of the class MUST maintain ownership of the {@link DataSource} 
and close this instance when the
+ * {@link DataSource} is about to be closed well. Both are to be done only 
once this instance will no longer be used.
+ */
+public class DBStatementExecutor implements Closeable {
+  private final DataSource dataSource;
+  private final Logger log;
+  private final ArrayList<ScheduledThreadPoolExecutor> scheduledExecutors;
+
+  public DBStatementExecutor(DataSource dataSource, Logger log) {
+    this.dataSource = dataSource;
+    this.log = log;
+    this.scheduledExecutors = new ArrayList<>();
+  }
+
+  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  public interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
+  public <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+      throws IOException {
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(sql)) {
+      T result = f.apply(statement);
+      if (shouldCommit) {
+        connection.commit();
+      }
+      statement.close();
+      return result;
+    } catch (SQLException e) {
+      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} "
+          + "Exception is {}", ((HikariDataSource) 
dataSource).getConnectionTestQuery(), e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Repeats execution of a SQL command at a fixed interval while the service 
is running. The first execution of the
+   * command is immediate.
+   * @param sqlCommand SQL string
+   * @param interval frequency with which command will run
+   * @param timeUnit unit of time for interval
+   */
+  public void repeatSqlCommandExecutionAtInterval(String sqlCommand, long 
interval, TimeUnit timeUnit) {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    Runnable task = () -> {
+      try {
+        withPreparedStatement(sqlCommand,
+            preparedStatement -> {
+              int numRowsAffected = preparedStatement.executeUpdate();
+              if (numRowsAffected != 0) {
+                log.info("{} rows affected by SQL command: {}", 
numRowsAffected, sqlCommand);
+              }
+              return numRowsAffected;
+            }, true);
+      } catch (IOException e) {
+        log.error("Failed to execute SQL command: {}", sqlCommand, e);
+      }
+    };
+    executor.scheduleAtFixedRate(task, 0, interval, timeUnit);
+    this.scheduledExecutors.add(executor);
+  }
+
+  /**
+   * Call before closing the data source object associated with this instance 
to also shut down any executors expecting
+   * to be run on the data source.
+   */
+  @Override
+  public void close() {
+    for (ScheduledThreadPoolExecutor executor : this.scheduledExecutors) {
+      executor.shutdownNow();
+    }
+  }
+}

Reply via email to