Will-Lo commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1379106697


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws 
IOException {
     } catch (SQLException e) {
       throw new IOException("Failure creation table " + tableName, e);
     }
+    this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
+    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
   }
 
   @Override
   public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
       existStatement.setString(++i, flowActionType.toString());
-      rs = existStatement.executeQuery();
-      rs.next();
-      return rs.getBoolean(1);
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
-          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
+      ResultSet rs = null;
+      try {
+        rs = existStatement.executeQuery();
+        rs.next();
+        return rs.getBoolean(1);
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
       }
-    }
+    }, true);
   }
 
   @Override
   public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+    dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
       insertStatement.setString(++i, flowActionType.toString());
-      insertStatement.executeUpdate();
-      connection.commit();
+      return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    }
+    }}, true);
   }
 
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
+    try {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
       deleteStatement.setString(++i, dagAction.getFlowExecutionId());
       deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
-      connection.commit();
       return result != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
-    }
+    }}, true);
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
   private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getStatement = 
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
-      int i = 0;
-      getStatement.setString(++i, flowGroup);
-      getStatement.setString(++i, flowName);
-      getStatement.setString(++i, flowExecutionId);
-      getStatement.setString(++i, flowActionType.toString());
-      rs = getStatement.executeQuery();
-      if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
-      } else {
-        if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
+      ResultSet rs = null;
+      try {
+        int i = 0;
+        getStatement.setString(++i, flowGroup);
+        getStatement.setString(++i, flowName);
+        getStatement.setString(++i, flowExecutionId);
+        getStatement.setString(++i, flowActionType.toString());
+        rs = getStatement.executeQuery();
+        if (rs.next()) {
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
         } else {
-          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
-              flowActionType, flowGroup, flowName, flowExecutionId));
-          return null;
+          if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+            return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+          } else {
+            log.warn(String.format("Can not find dag action: %s with 
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+                flowActionType, flowGroup, flowName, flowExecutionId));
+            return null;
+          }
+        }
+      } catch (SQLException | InterruptedException e) {
+        throw new IOException(String.format("Failure get %s from table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
         }
       }
-    } catch (SQLException | InterruptedException e) {
-      throw new IOException(String.format("Failure get %s from table %s", new 
DagAction(flowGroup, flowName, flowExecutionId,
-          flowActionType), tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
-      }
-    }
+    }, true);
   }
 
   @Override
   public Collection<DagAction> getDagActions() throws IOException {
-    HashSet<DagAction> result = new HashSet<>();
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getAllStatement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
-        ResultSet rs = getAllStatement.executeQuery()) {
-      while (rs.next()) {
-        result.add(
-            new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), 
FlowActionType.valueOf(rs.getString(4))));
-      }
-      if (rs != null) {
-        rs.close();
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT, 
tableName), getAllStatement -> {
+      ResultSet rs = null;
+      try {
+        HashSet<DagAction> result = new HashSet<>();
+        rs = getAllStatement.executeQuery();
+        while (rs.next()) {
+          result.add(new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
+        }
+        return result;
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure get dag actions from 
table %s ", tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }

Review Comment:
   Can `rs` be encapsulated with try with resources if it needs to be closed?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws 
IOException {
     } catch (SQLException e) {
       throw new IOException("Failure creation table " + tableName, e);
     }
+    this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
+    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
   }
 
   @Override
   public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
       existStatement.setString(++i, flowActionType.toString());
-      rs = existStatement.executeQuery();
-      rs.next();
-      return rs.getBoolean(1);
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
-          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
+      ResultSet rs = null;
+      try {
+        rs = existStatement.executeQuery();
+        rs.next();
+        return rs.getBoolean(1);
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
       }
-    }
+    }, true);
   }
 
   @Override
   public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+    dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
       insertStatement.setString(++i, flowActionType.toString());
-      insertStatement.executeUpdate();
-      connection.commit();
+      return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    }
+    }}, true);
   }
 
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
+    try {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
       deleteStatement.setString(++i, dagAction.getFlowExecutionId());
       deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
-      connection.commit();
       return result != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
-    }
+    }}, true);
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
   private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getStatement = 
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
-      int i = 0;
-      getStatement.setString(++i, flowGroup);
-      getStatement.setString(++i, flowName);
-      getStatement.setString(++i, flowExecutionId);
-      getStatement.setString(++i, flowActionType.toString());
-      rs = getStatement.executeQuery();
-      if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
-      } else {
-        if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
+      ResultSet rs = null;
+      try {
+        int i = 0;
+        getStatement.setString(++i, flowGroup);
+        getStatement.setString(++i, flowName);
+        getStatement.setString(++i, flowExecutionId);
+        getStatement.setString(++i, flowActionType.toString());
+        rs = getStatement.executeQuery();
+        if (rs.next()) {
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
         } else {
-          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
-              flowActionType, flowGroup, flowName, flowExecutionId));
-          return null;
+          if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+            return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+          } else {
+            log.warn(String.format("Can not find dag action: %s with 
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+                flowActionType, flowGroup, flowName, flowExecutionId));

Review Comment:
   Is it possible to encapsulate an exponential retry with the db statement 
executor? Seems useful to reuse in other areas if needed.
   
   Also if not, prefer to have else-if at the same level then else



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws 
IOException {
     } catch (SQLException e) {
       throw new IOException("Failure creation table " + tableName, e);
     }
+    this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
+    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
   }
 
   @Override
   public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
       existStatement.setString(++i, flowActionType.toString());
-      rs = existStatement.executeQuery();
-      rs.next();
-      return rs.getBoolean(1);
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
-          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
+      ResultSet rs = null;
+      try {
+        rs = existStatement.executeQuery();
+        rs.next();
+        return rs.getBoolean(1);
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
       }
-    }
+    }, true);
   }
 
   @Override
   public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+    dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
       insertStatement.setString(++i, flowActionType.toString());
-      insertStatement.executeUpdate();
-      connection.commit();
+      return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    }
+    }}, true);
   }
 
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
+    try {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
       deleteStatement.setString(++i, dagAction.getFlowExecutionId());
       deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
-      connection.commit();
       return result != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
-    }
+    }}, true);
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
   private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getStatement = 
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
-      int i = 0;
-      getStatement.setString(++i, flowGroup);
-      getStatement.setString(++i, flowName);
-      getStatement.setString(++i, flowExecutionId);
-      getStatement.setString(++i, flowActionType.toString());
-      rs = getStatement.executeQuery();
-      if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
-      } else {
-        if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
+      ResultSet rs = null;
+      try {
+        int i = 0;
+        getStatement.setString(++i, flowGroup);
+        getStatement.setString(++i, flowName);
+        getStatement.setString(++i, flowExecutionId);
+        getStatement.setString(++i, flowActionType.toString());
+        rs = getStatement.executeQuery();
+        if (rs.next()) {
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
         } else {
-          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
-              flowActionType, flowGroup, flowName, flowExecutionId));
-          return null;
+          if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+            return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+          } else {
+            log.warn(String.format("Can not find dag action: %s with 
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+                flowActionType, flowGroup, flowName, flowExecutionId));
+            return null;
+          }
+        }
+      } catch (SQLException | InterruptedException e) {
+        throw new IOException(String.format("Failure get %s from table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();

Review Comment:
   same comment here on try-with-resources



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to