[ 
https://issues.apache.org/jira/browse/GOBBLIN-1942?focusedWorklogId=888339&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-888339
 ]

ASF GitHub Bot logged work on GOBBLIN-1942:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Nov/23 17:56
            Start Date: 01/Nov/23 17:56
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1379130049


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws 
IOException {
     } catch (SQLException e) {
       throw new IOException("Failure creation table " + tableName, e);
     }
+    this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+    this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, 
this.tableName, retentionPeriodSeconds);
+    // Periodically deletes all rows in the table last_modified before the 
retention period defined by config.
+    
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
 6, TimeUnit.HOURS);
   }
 
   @Override
   public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
       existStatement.setString(++i, flowActionType.toString());
-      rs = existStatement.executeQuery();
-      rs.next();
-      return rs.getBoolean(1);
-    } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
-          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    } finally {
-      if (rs != null) {
-        rs.close();
+      ResultSet rs = null;
+      try {
+        rs = existStatement.executeQuery();
+        rs.next();
+        return rs.getBoolean(1);
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+            new DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType), tableName), e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
       }
-    }
+    }, true);
   }
 
   @Override
   public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+    dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
       insertStatement.setString(++i, flowActionType.toString());
-      insertStatement.executeUpdate();
-      connection.commit();
+      return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
           new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
-    }
+    }}, true);
   }
 
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
+    try {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
       deleteStatement.setString(++i, dagAction.getFlowExecutionId());
       deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
-      connection.commit();
       return result != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
-    }
+    }}, true);
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
   private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
-    ResultSet rs = null;
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement getStatement = 
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
-      int i = 0;
-      getStatement.setString(++i, flowGroup);
-      getStatement.setString(++i, flowName);
-      getStatement.setString(++i, flowExecutionId);
-      getStatement.setString(++i, flowActionType.toString());
-      rs = getStatement.executeQuery();
-      if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
-      } else {
-        if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+    return 
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
+      ResultSet rs = null;
+      try {
+        int i = 0;
+        getStatement.setString(++i, flowGroup);
+        getStatement.setString(++i, flowName);
+        getStatement.setString(++i, flowExecutionId);
+        getStatement.setString(++i, flowActionType.toString());
+        rs = getStatement.executeQuery();
+        if (rs.next()) {
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
         } else {
-          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
-              flowActionType, flowGroup, flowName, flowExecutionId));
-          return null;
+          if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+            return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
+          } else {
+            log.warn(String.format("Can not find dag action: %s with 
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+                flowActionType, flowGroup, flowName, flowExecutionId));

Review Comment:
   This function is actually unused at the moment and there's a TODO comment 
about potential refactoring. Until we decide how to update it, I am just 
changing the formatting in this PR to remove one of the nested if's to else if. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 888339)
    Time Spent: 3h 40m  (was: 3.5h)

> Create MySQL util class for re-usable methods & enable MysqlDagActionStore 
> retention
> ------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1942
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1942
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Defines a new class {{MySQLStoreUtils}} used for common functionality between 
> MySQL based implementations of stores. It includes a new method to run a SQL 
> command in a {{ScheduledThreadPoolExecutor}} using {{interval T}} which is 
> used for retention on the {{MysqlDagActionStore}} and 
> {{{}MysqlMultiActiveLeaseArbiter{}}}.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to