[
https://issues.apache.org/jira/browse/GOBBLIN-1942?focusedWorklogId=887970&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887970
]
ASF GitHub Bot logged work on GOBBLIN-1942:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Oct/23 23:30
Start Date: 30/Oct/23 23:30
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3812:
URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376900715
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws
IOException {
} catch (SQLException e) {
throw new IOException("Failure creation table " + tableName, e);
}
+ this.mySQLStoreUtils = new MySQLStoreUtils(this.dataSource, log);
+ this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
+ // Periodically deletes all rows in the table last_modified before the
retention period defined by config.
+
mySQLStoreUtils.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6, TimeUnit.HOURS);
}
@Override
public boolean exists(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) throws IOException,
SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement existStatement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+ return
mySQLStoreUtils.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
existStatement.setString(++i, flowActionType.toString());
- rs = existStatement.executeQuery();
- rs.next();
- return rs.getBoolean(1);
- } catch (SQLException e) {
- throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
- new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
+ ResultSet rs = null;
+ try {
+ rs = existStatement.executeQuery();
+ rs.next();
+ return rs.getBoolean(1);
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
}
- }
+ }, true);
}
@Override
public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType)
throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement insertStatement =
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+ mySQLStoreUtils.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
insertStatement.setString(++i, flowActionType.toString());
- insertStatement.executeUpdate();
- connection.commit();
+ return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- }
+ }}, true);
}
@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement deleteStatement =
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+ return
mySQLStoreUtils.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
+ try {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
deleteStatement.setString(++i, dagAction.getFlowExecutionId());
deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
- connection.commit();
return result != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
- }
+ }}, true);
}
// TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff
exponentialBackoff)
throws IOException, SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getStatement =
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
- int i = 0;
- getStatement.setString(++i, flowGroup);
- getStatement.setString(++i, flowName);
- getStatement.setString(++i, flowExecutionId);
- getStatement.setString(++i, flowActionType.toString());
- rs = getStatement.executeQuery();
- if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
- } else {
- if (exponentialBackoff.awaitNextRetryIfAvailable()) {
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ return mySQLStoreUtils.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
+ ResultSet rs = null;
+ try {
+ int i = 0;
+ getStatement.setString(++i, flowGroup);
+ getStatement.setString(++i, flowName);
+ getStatement.setString(++i, flowExecutionId);
+ getStatement.setString(++i, flowActionType.toString());
+ rs = getStatement.executeQuery();
+ if (rs.next()) {
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
} else {
- log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
- flowActionType, flowGroup, flowName, flowExecutionId));
- return null;
+ if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+ return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
+ } else {
+ log.warn(String.format("Can not find dag action: %s with
flowGroup: %s, flowName: %s, flowExecutionId: %s",
+ flowActionType, flowGroup, flowName, flowExecutionId));
+ return null;
+ }
+ }
+ } catch (SQLException | InterruptedException e) {
+ throw new IOException(String.format("Failure get %s from table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
}
}
- } catch (SQLException | InterruptedException e) {
- throw new IOException(String.format("Failure get %s from table %s", new
DagAction(flowGroup, flowName, flowExecutionId,
- flowActionType), tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
- }
- }
+ }, true);
}
@Override
public Collection<DagAction> getDagActions() throws IOException {
- HashSet<DagAction> result = new HashSet<>();
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getAllStatement =
connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
- ResultSet rs = getAllStatement.executeQuery()) {
- while (rs.next()) {
- result.add(
- new DagAction(rs.getString(1), rs.getString(2), rs.getString(3),
FlowActionType.valueOf(rs.getString(4))));
- }
- if (rs != null) {
- rs.close();
+ return
mySQLStoreUtils.withPreparedStatement(String.format(GET_ALL_STATEMENT,
tableName), getAllStatement -> {
+ ResultSet rs = null;
+ try {
+ HashSet<DagAction> result = new HashSet<>();
+ rs = getAllStatement.executeQuery();
+ while (rs.next()) {
+ result.add(new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure get dag actions from
table %s ", tableName), e);
Review Comment:
I didn't write the original error messages but I do believe it's to provide
more context in the error message about consequence of the failed
insert/update/delete etc...
Issue Time Tracking
-------------------
Worklog Id: (was: 887970)
Time Spent: 2h 20m (was: 2h 10m)
> Create MySQL util class for re-usable methods & enable MysqlDagActionStore
> retention
> ------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1942
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1942
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> Defines a new class {{MySQLStoreUtils}} used for common functionality between
> MySQL based implementations of stores. It includes a new method to run a SQL
> command in a {{ScheduledThreadPoolExecutor}} using {{interval T}} which is
> used for retention on the {{MysqlDagActionStore}} and
> {{{}MysqlMultiActiveLeaseArbiter{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)