umustafi commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376900715


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws 
IOException {
     } catch (SQLException e) {
       throw new IOException("Failure creation table " + tableName, e);
     }
+    this.mySQLStoreUtils = new MySQLStoreUtils(this.dataSource, log);
+    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
+    
mySQLStoreUtils.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
   }
 
   @Override
   public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+    return 
mySQLStoreUtils.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
       existStatement.setString(++i, flowActionType.toString());
-      rs = existStatement.executeQuery();
-      rs.next();
-      return rs.getBoolean(1);
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
-          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
+      ResultSet rs = null;
+      try {
+        rs = existStatement.executeQuery();
+        rs.next();
+        return rs.getBoolean(1);
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
       }
-    }
+    }, true);
   }
 
   @Override
   public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+    mySQLStoreUtils.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
       insertStatement.setString(++i, flowActionType.toString());
-      insertStatement.executeUpdate();
-      connection.commit();
+      return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    }
+    }}, true);
   }
 
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+    return 
mySQLStoreUtils.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
+    try {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
       deleteStatement.setString(++i, dagAction.getFlowExecutionId());
       deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
-      connection.commit();
       return result != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
-    }
+    }}, true);
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
   private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getStatement = 
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
-      int i = 0;
-      getStatement.setString(++i, flowGroup);
-      getStatement.setString(++i, flowName);
-      getStatement.setString(++i, flowExecutionId);
-      getStatement.setString(++i, flowActionType.toString());
-      rs = getStatement.executeQuery();
-      if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
-      } else {
-        if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+    return mySQLStoreUtils.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
+      ResultSet rs = null;
+      try {
+        int i = 0;
+        getStatement.setString(++i, flowGroup);
+        getStatement.setString(++i, flowName);
+        getStatement.setString(++i, flowExecutionId);
+        getStatement.setString(++i, flowActionType.toString());
+        rs = getStatement.executeQuery();
+        if (rs.next()) {
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
         } else {
-          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
-              flowActionType, flowGroup, flowName, flowExecutionId));
-          return null;
+          if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+            return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+          } else {
+            log.warn(String.format("Can not find dag action: %s with 
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+                flowActionType, flowGroup, flowName, flowExecutionId));
+            return null;
+          }
+        }
+      } catch (SQLException | InterruptedException e) {
+        throw new IOException(String.format("Failure get %s from table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
         }
       }
-    } catch (SQLException | InterruptedException e) {
-      throw new IOException(String.format("Failure get %s from table %s", new 
DagAction(flowGroup, flowName, flowExecutionId,
-          flowActionType), tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
-      }
-    }
+    }, true);
   }
 
   @Override
   public Collection<DagAction> getDagActions() throws IOException {
-    HashSet<DagAction> result = new HashSet<>();
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getAllStatement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
-        ResultSet rs = getAllStatement.executeQuery()) {
-      while (rs.next()) {
-        result.add(
-            new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), 
FlowActionType.valueOf(rs.getString(4))));
-      }
-      if (rs != null) {
-        rs.close();
+    return 
mySQLStoreUtils.withPreparedStatement(String.format(GET_ALL_STATEMENT, 
tableName), getAllStatement -> {
+      ResultSet rs = null;
+      try {
+        HashSet<DagAction> result = new HashSet<>();
+        rs = getAllStatement.executeQuery();
+        while (rs.next()) {
+          result.add(new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
+        }
+        return result;
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure get dag actions from 
table %s ", tableName), e);

Review Comment:
   I didn't write the original error messages but I do believe it's to provide 
more context in the error message about consequence of the failed 
insert/update/delete etc...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to