Will-Lo commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1379106697
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws
IOException {
} catch (SQLException e) {
throw new IOException("Failure creation table " + tableName, e);
}
+ this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+ this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
+ // Periodically deletes all rows in the table last_modified before the
retention period defined by config.
+
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6, TimeUnit.HOURS);
}
@Override
public boolean exists(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) throws IOException,
SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement existStatement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
existStatement.setString(++i, flowActionType.toString());
- rs = existStatement.executeQuery();
- rs.next();
- return rs.getBoolean(1);
- } catch (SQLException e) {
- throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
- new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
+ ResultSet rs = null;
+ try {
+ rs = existStatement.executeQuery();
+ rs.next();
+ return rs.getBoolean(1);
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
}
- }
+ }, true);
}
@Override
public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType)
throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement insertStatement =
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+ dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
insertStatement.setString(++i, flowActionType.toString());
- insertStatement.executeUpdate();
- connection.commit();
+ return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- }
+ }}, true);
}
@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement deleteStatement =
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
+ try {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
deleteStatement.setString(++i, dagAction.getFlowExecutionId());
deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
- connection.commit();
return result != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
- }
+ }}, true);
}
// TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff
exponentialBackoff)
throws IOException, SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getStatement =
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
- int i = 0;
- getStatement.setString(++i, flowGroup);
- getStatement.setString(++i, flowName);
- getStatement.setString(++i, flowExecutionId);
- getStatement.setString(++i, flowActionType.toString());
- rs = getStatement.executeQuery();
- if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
- } else {
- if (exponentialBackoff.awaitNextRetryIfAvailable()) {
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
+ ResultSet rs = null;
+ try {
+ int i = 0;
+ getStatement.setString(++i, flowGroup);
+ getStatement.setString(++i, flowName);
+ getStatement.setString(++i, flowExecutionId);
+ getStatement.setString(++i, flowActionType.toString());
+ rs = getStatement.executeQuery();
+ if (rs.next()) {
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
} else {
- log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
- flowActionType, flowGroup, flowName, flowExecutionId));
- return null;
+ if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+ return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ } else {
+ log.warn(String.format("Can not find dag action: %s with
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+ flowActionType, flowGroup, flowName, flowExecutionId));
+ return null;
+ }
+ }
+ } catch (SQLException | InterruptedException e) {
+ throw new IOException(String.format("Failure get %s from table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
}
}
- } catch (SQLException | InterruptedException e) {
- throw new IOException(String.format("Failure get %s from table %s", new
DagAction(flowGroup, flowName, flowExecutionId,
- flowActionType), tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
- }
- }
+ }, true);
}
@Override
public Collection<DagAction> getDagActions() throws IOException {
- HashSet<DagAction> result = new HashSet<>();
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getAllStatement =
connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
- ResultSet rs = getAllStatement.executeQuery()) {
- while (rs.next()) {
- result.add(
- new DagAction(rs.getString(1), rs.getString(2), rs.getString(3),
FlowActionType.valueOf(rs.getString(4))));
- }
- if (rs != null) {
- rs.close();
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT,
tableName), getAllStatement -> {
+ ResultSet rs = null;
+ try {
+ HashSet<DagAction> result = new HashSet<>();
+ rs = getAllStatement.executeQuery();
+ while (rs.next()) {
+ result.add(new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure get dag actions from
table %s ", tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
Review Comment:
Can `rs` be encapsulated with try with resources if it needs to be closed?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws
IOException {
} catch (SQLException e) {
throw new IOException("Failure creation table " + tableName, e);
}
+ this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+ this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
+ // Periodically deletes all rows in the table last_modified before the
retention period defined by config.
+
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6, TimeUnit.HOURS);
}
@Override
public boolean exists(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) throws IOException,
SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement existStatement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
existStatement.setString(++i, flowActionType.toString());
- rs = existStatement.executeQuery();
- rs.next();
- return rs.getBoolean(1);
- } catch (SQLException e) {
- throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
- new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
+ ResultSet rs = null;
+ try {
+ rs = existStatement.executeQuery();
+ rs.next();
+ return rs.getBoolean(1);
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
}
- }
+ }, true);
}
@Override
public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType)
throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement insertStatement =
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+ dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
insertStatement.setString(++i, flowActionType.toString());
- insertStatement.executeUpdate();
- connection.commit();
+ return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- }
+ }}, true);
}
@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement deleteStatement =
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
+ try {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
deleteStatement.setString(++i, dagAction.getFlowExecutionId());
deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
- connection.commit();
return result != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
- }
+ }}, true);
}
// TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff
exponentialBackoff)
throws IOException, SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getStatement =
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
- int i = 0;
- getStatement.setString(++i, flowGroup);
- getStatement.setString(++i, flowName);
- getStatement.setString(++i, flowExecutionId);
- getStatement.setString(++i, flowActionType.toString());
- rs = getStatement.executeQuery();
- if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
- } else {
- if (exponentialBackoff.awaitNextRetryIfAvailable()) {
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
+ ResultSet rs = null;
+ try {
+ int i = 0;
+ getStatement.setString(++i, flowGroup);
+ getStatement.setString(++i, flowName);
+ getStatement.setString(++i, flowExecutionId);
+ getStatement.setString(++i, flowActionType.toString());
+ rs = getStatement.executeQuery();
+ if (rs.next()) {
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
} else {
- log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
- flowActionType, flowGroup, flowName, flowExecutionId));
- return null;
+ if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+ return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ } else {
+ log.warn(String.format("Can not find dag action: %s with
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+ flowActionType, flowGroup, flowName, flowExecutionId));
Review Comment:
Is it possible to encapsulate an exponential retry with the db statement
executor? Seems useful to reuse in other areas if needed.
Also if not, prefer to have else-if at the same level then else
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws
IOException {
} catch (SQLException e) {
throw new IOException("Failure creation table " + tableName, e);
}
+ this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+ this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
+ // Periodically deletes all rows in the table last_modified before the
retention period defined by config.
+
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6, TimeUnit.HOURS);
}
@Override
public boolean exists(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) throws IOException,
SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement existStatement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
existStatement.setString(++i, flowActionType.toString());
- rs = existStatement.executeQuery();
- rs.next();
- return rs.getBoolean(1);
- } catch (SQLException e) {
- throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
- new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
+ ResultSet rs = null;
+ try {
+ rs = existStatement.executeQuery();
+ rs.next();
+ return rs.getBoolean(1);
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
}
- }
+ }, true);
}
@Override
public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType)
throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement insertStatement =
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+ dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
insertStatement.setString(++i, flowActionType.toString());
- insertStatement.executeUpdate();
- connection.commit();
+ return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- }
+ }}, true);
}
@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement deleteStatement =
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
+ try {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
deleteStatement.setString(++i, dagAction.getFlowExecutionId());
deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
- connection.commit();
return result != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
- }
+ }}, true);
}
// TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff
exponentialBackoff)
throws IOException, SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getStatement =
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
- int i = 0;
- getStatement.setString(++i, flowGroup);
- getStatement.setString(++i, flowName);
- getStatement.setString(++i, flowExecutionId);
- getStatement.setString(++i, flowActionType.toString());
- rs = getStatement.executeQuery();
- if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
- } else {
- if (exponentialBackoff.awaitNextRetryIfAvailable()) {
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
+ ResultSet rs = null;
+ try {
+ int i = 0;
+ getStatement.setString(++i, flowGroup);
+ getStatement.setString(++i, flowName);
+ getStatement.setString(++i, flowExecutionId);
+ getStatement.setString(++i, flowActionType.toString());
+ rs = getStatement.executeQuery();
+ if (rs.next()) {
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
} else {
- log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
- flowActionType, flowGroup, flowName, flowExecutionId));
- return null;
+ if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+ return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ } else {
+ log.warn(String.format("Can not find dag action: %s with
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+ flowActionType, flowGroup, flowName, flowExecutionId));
+ return null;
+ }
+ }
+ } catch (SQLException | InterruptedException e) {
+ throw new IOException(String.format("Failure get %s from table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
Review Comment:
same comment here on try-with-resources
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]