umustafi commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1379130049
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws
IOException {
} catch (SQLException e) {
throw new IOException("Failure creation table " + tableName, e);
}
+ this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+ this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
+ // Periodically deletes all rows in the table last_modified before the
retention period defined by config.
+
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6, TimeUnit.HOURS);
}
@Override
public boolean exists(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) throws IOException,
SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement existStatement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
existStatement.setString(++i, flowActionType.toString());
- rs = existStatement.executeQuery();
- rs.next();
- return rs.getBoolean(1);
- } catch (SQLException e) {
- throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
- new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
+ ResultSet rs = null;
+ try {
+ rs = existStatement.executeQuery();
+ rs.next();
+ return rs.getBoolean(1);
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
}
- }
+ }, true);
}
@Override
public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType)
throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement insertStatement =
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+ dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
insertStatement.setString(++i, flowActionType.toString());
- insertStatement.executeUpdate();
- connection.commit();
+ return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- }
+ }}, true);
}
@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement deleteStatement =
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
+ try {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
deleteStatement.setString(++i, dagAction.getFlowExecutionId());
deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
- connection.commit();
return result != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
- }
+ }}, true);
}
// TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff
exponentialBackoff)
throws IOException, SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getStatement =
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
- int i = 0;
- getStatement.setString(++i, flowGroup);
- getStatement.setString(++i, flowName);
- getStatement.setString(++i, flowExecutionId);
- getStatement.setString(++i, flowActionType.toString());
- rs = getStatement.executeQuery();
- if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
- } else {
- if (exponentialBackoff.awaitNextRetryIfAvailable()) {
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
+ ResultSet rs = null;
+ try {
+ int i = 0;
+ getStatement.setString(++i, flowGroup);
+ getStatement.setString(++i, flowName);
+ getStatement.setString(++i, flowExecutionId);
+ getStatement.setString(++i, flowActionType.toString());
+ rs = getStatement.executeQuery();
+ if (rs.next()) {
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
} else {
- log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
- flowActionType, flowGroup, flowName, flowExecutionId));
- return null;
+ if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+ return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ } else {
+ log.warn(String.format("Can not find dag action: %s with
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+ flowActionType, flowGroup, flowName, flowExecutionId));
Review Comment:
This function is actually unused at the moment and there's a TODO comment
about potential refactoring. Until we decide how to update it, I am just
changing the formatting in this PR to remove one of the nested if's to else if.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]