[ 
https://issues.apache.org/jira/browse/GOBBLIN-1942?focusedWorklogId=887940&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887940
 ]

ASF GitHub Bot logged work on GOBBLIN-1942:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Oct/23 20:40
            Start Date: 30/Oct/23 20:40
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376767399


##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can 
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source 
object and executing SQL queries at fixed
+ * intervals. The instantiater of the class should provide the data source 
used within this utility.
+ */
+public class MySQLStoreUtils {

Review Comment:
   naming-wise, what's mysql-specific about the impl?  wouldn't it work for any 
DB?  (is it merely for that fancy `HikariDataSource` logging trick)  if that's 
all, let's not codify mysql in the name...



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws 
IOException {
     } catch (SQLException e) {
       throw new IOException("Failure creation table " + tableName, e);
     }
+    this.mySQLStoreUtils = new MySQLStoreUtils(this.dataSource, log);
+    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
+    
mySQLStoreUtils.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
   }
 
   @Override
   public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+    return 
mySQLStoreUtils.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
       existStatement.setString(++i, flowActionType.toString());
-      rs = existStatement.executeQuery();
-      rs.next();
-      return rs.getBoolean(1);
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
-          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
+      ResultSet rs = null;
+      try {
+        rs = existStatement.executeQuery();
+        rs.next();
+        return rs.getBoolean(1);
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
       }
-    }
+    }, true);
   }
 
   @Override
   public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+    mySQLStoreUtils.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
       insertStatement.setString(++i, flowActionType.toString());
-      insertStatement.executeUpdate();
-      connection.commit();
+      return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    }
+    }}, true);
   }
 
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+    return 
mySQLStoreUtils.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
+    try {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
       deleteStatement.setString(++i, dagAction.getFlowExecutionId());
       deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
-      connection.commit();
       return result != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
-    }
+    }}, true);
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
   private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getStatement = 
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
-      int i = 0;
-      getStatement.setString(++i, flowGroup);
-      getStatement.setString(++i, flowName);
-      getStatement.setString(++i, flowExecutionId);
-      getStatement.setString(++i, flowActionType.toString());
-      rs = getStatement.executeQuery();
-      if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
-      } else {
-        if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+    return mySQLStoreUtils.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
+      ResultSet rs = null;
+      try {
+        int i = 0;
+        getStatement.setString(++i, flowGroup);
+        getStatement.setString(++i, flowName);
+        getStatement.setString(++i, flowExecutionId);
+        getStatement.setString(++i, flowActionType.toString());
+        rs = getStatement.executeQuery();
+        if (rs.next()) {
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
         } else {
-          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
-              flowActionType, flowGroup, flowName, flowExecutionId));
-          return null;
+          if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+            return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+          } else {
+            log.warn(String.format("Can not find dag action: %s with 
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+                flowActionType, flowGroup, flowName, flowExecutionId));
+            return null;
+          }
+        }
+      } catch (SQLException | InterruptedException e) {
+        throw new IOException(String.format("Failure get %s from table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
         }
       }
-    } catch (SQLException | InterruptedException e) {
-      throw new IOException(String.format("Failure get %s from table %s", new 
DagAction(flowGroup, flowName, flowExecutionId,
-          flowActionType), tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
-      }
-    }
+    }, true);
   }
 
   @Override
   public Collection<DagAction> getDagActions() throws IOException {
-    HashSet<DagAction> result = new HashSet<>();
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getAllStatement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
-        ResultSet rs = getAllStatement.executeQuery()) {
-      while (rs.next()) {
-        result.add(
-            new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), 
FlowActionType.valueOf(rs.getString(4))));
-      }
-      if (rs != null) {
-        rs.close();
+    return 
mySQLStoreUtils.withPreparedStatement(String.format(GET_ALL_STATEMENT, 
tableName), getAllStatement -> {
+      ResultSet rs = null;
+      try {
+        HashSet<DagAction> result = new HashSet<>();
+        rs = getAllStatement.executeQuery();
+        while (rs.next()) {
+          result.add(new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
+        }
+        return result;
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure get dag actions from 
table %s ", tableName), e);

Review Comment:
   `withPreparedStatement` will already map `SQLException` to `IOException`... 
do you do this here just for a specific error message?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can 
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source 
object and executing SQL queries at fixed
+ * intervals. 
+ */
+public class MySQLStoreUtils {
+  private final DataSource dataSource;

Review Comment:
   would be clearer to say something like "MUST maintain ownership of the 
{@link DataSource} and arrange for it to be closed, but only once this instance 
will no longer be used"
   
   ... and with that in mind, how would the repeating SQL mesh with that?  
should this instance hold on to what it schedules, and provide a `.close()` 
method that would unschedule them all?  if so, the advised shutdown protocol 
might be:
   ```
   storeUtils.close();
   dataSource.close();
   ```
   ?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -80,13 +77,9 @@
  */
 @Slf4j
 public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
-  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
-  @FunctionalInterface
-  protected interface CheckedFunction<T, R> {
-    R apply(T t) throws IOException, SQLException;
-  }
 
   protected final DataSource dataSource;
+  private final MySQLStoreUtils mySQLStoreUtils;

Review Comment:
   how about `DBStatementExecutor`?



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,10 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Mysql Dag Action Store configuration
+  public static final String MYSQL_DAG_ACTION_STORE_PREFIX = 
"MysqlDagActionStore.";
+  public static final String 
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 
MYSQL_DAG_ACTION_STORE_PREFIX + ".retentionPeriodSec";
+  public static final long 
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 
60; // (3 days in seconds)

Review Comment:
   you resolved this, but I still see `SEC_KEY`... do you plan to leave as-is?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * MySQL based implementations of stores require common functionality that can 
be stored in a utility class. The
+ * functionality includes executing prepared statements on a data source 
object and executing SQL queries at fixed
+ * intervals. The instantiater of the class should provide the data source 
used within this utility.
+ */
+public class MySQLStoreUtils {
+  private final DataSource dataSource;
+  private final Logger log;
+
+  public MySQLStoreUtils(DataSource dataSource, Logger log) {
+    this.dataSource = dataSource;
+    this.log = log;
+  }
+
+  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  public interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
+  public <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)

Review Comment:
   what's your thinking on other classes with their own 
`withPreparedStatement`, such as `MysqlBaseSpecStore`--do they deserve a TODO 
comment because you recommend to migrate them to use this impl-in-common?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 887940)
    Time Spent: 1h 40m  (was: 1.5h)

> Create MySQL util class for re-usable methods & enable MysqlDagActionStore 
> retention
> ------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1942
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1942
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Defines a new class {{MySQLStoreUtils}} used for common functionality between 
> MySQL based implementations of stores. It includes a new method to run a SQL 
> command in a {{ScheduledThreadPoolExecutor}} using {{interval T}} which is 
> used for retention on the {{MysqlDagActionStore}} and 
> {{{}MysqlMultiActiveLeaseArbiter{}}}.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to