This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c865b6a8e [GOBBLIN-1942] Create MySQL util class for re-usable methods
and setup MysqlDagActio… (#3812)
c865b6a8e is described below
commit c865b6a8e4628f9602671151a1a33c8a27dacdef
Author: umustafi <[email protected]>
AuthorDate: Wed Nov 1 12:15:24 2023 -0700
[GOBBLIN-1942] Create MySQL util class for re-usable methods and setup
MysqlDagActio… (#3812)
* Create MySQL util class for re-usable methods and setup
MysqlDagActionStore retention
* Add a java doc
* Address review comments
* Close scheduled executors on shutdown & clarify naming and comments
* Remove extra period making config key invalid
* implement Closeable
* Use try with resources
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 4 +
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 84 ++++------------
.../dag_action_store/MysqlDagActionStore.java | 107 ++++++++++----------
.../runtime/spec_store/MysqlBaseSpecStore.java | 1 +
.../apache/gobblin/util/DBStatementExecutor.java | 111 +++++++++++++++++++++
5 files changed, 188 insertions(+), 119 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a50ba8c75..5fe8f001a 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -95,6 +95,10 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Mysql Dag Action Store configuration
+ public static final String MYSQL_DAG_ACTION_STORE_PREFIX =
"MysqlDagActionStore.";
+ public static final String
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY =
MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
+ public static final long
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 *
60; // (3 days in seconds)
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX =
"MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 338e908a2..05449767c 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.runtime.api;
import com.google.inject.Inject;
import com.typesafe.config.Config;
-import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -30,7 +29,6 @@ import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.Data;
@@ -40,8 +38,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
-
-import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
+import org.apache.gobblin.util.DBStatementExecutor;
/**
@@ -80,13 +77,9 @@ import static
org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
*/
@Slf4j
public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
- /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
- @FunctionalInterface
- protected interface CheckedFunction<T, R> {
- R apply(T t) throws IOException, SQLException;
- }
protected final DataSource dataSource;
+ private final DBStatementExecutor dbStatementExecutor;
private final String leaseArbiterTableName;
private final String constantsTableName;
private final int epsilonMillis;
@@ -121,7 +114,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Deletes rows older than retention time period regardless of lease status
as they should all be invalid or completed
// since retention >> linger
private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT =
"DELETE FROM %s WHERE event_timestamp < "
- + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
+ + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY
(primary_key))";
// Only insert epsilon and linger values from config if this table does not
contain a pre-existing values already.
@@ -196,7 +189,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.retentionPeriodMillis = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
- this.thisTableRetentionStatement =
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT,
this.leaseArbiterTableName);
+ this.thisTableRetentionStatement =
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT,
this.leaseArbiterTableName,
+ retentionPeriodMillis);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT,
this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableGetInfoStatementForReminder =
String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
@@ -208,6 +202,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
this.thisTableAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
String createArbiterStatement = String.format(
CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
@@ -218,17 +213,21 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
initializeConstantsTable();
- runRetentionOnArbitrationTable();
+
+ // Periodically deletes all rows in the table with event_timestamp older
than the retention period defined by config.
+
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
4, TimeUnit.HOURS);
+
log.info("MysqlMultiActiveLeaseArbiter initialized");
}
// Initialize Constants table if needed and insert row into it if one does
not exist
private void initializeConstantsTable() throws IOException {
String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
- withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+ dbStatementExecutor.withPreparedStatement(createConstantsStatement,
createStatement -> createStatement.executeUpdate(),
+ true);
String insertConstantsStatement =
String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
- withPreparedStatement(insertConstantsStatement, insertStatement -> {
+ dbStatementExecutor.withPreparedStatement(insertConstantsStatement,
insertStatement -> {
int i = 0;
insertStatement.setInt(++i, epsilonMillis);
insertStatement.setInt(++i, lingerMillis);
@@ -236,34 +235,6 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}, true);
}
- /**
- * Periodically deletes all rows in the table with event_timestamp older
than the retention period defined by config.
- * // TODO: create a utility to run a SQL commend in a STPE using interval T
- */
- private void runRetentionOnArbitrationTable() {
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
- Runnable retentionTask = () -> {
- try {
- withPreparedStatement(thisTableRetentionStatement,
- retentionStatement -> {
- retentionStatement.setLong(1, retentionPeriodMillis);
- int numRowsDeleted = retentionStatement.executeUpdate();
- if (numRowsDeleted != 0) {
- log.info("Multi-active lease arbiter retention thread deleted
{} rows from the lease arbiter table",
- numRowsDeleted);
- }
- return numRowsDeleted;
- }, true);
- } catch (IOException e) {
- log.error("Failing to run retention on lease arbiter table. Unbounded
growth can lead to database slowness and "
- + "affect our system performance. Examine exception: ", e);
- }
- };
-
- // Run retention thread every 4 hours (6 times a day)
- executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS);
- }
-
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
@@ -370,7 +341,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
*/
protected Optional<GetEventInfoResult>
getExistingEventInfo(DagActionStore.DagAction flowAction,
boolean isReminderEvent, long eventTimeMillis) throws IOException {
- return withPreparedStatement(isReminderEvent ?
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
+ return dbStatementExecutor.withPreparedStatement(isReminderEvent ?
thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
if (isReminderEvent) {
@@ -425,7 +396,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction)
throws IOException {
String formattedAcquireLeaseNewRowStatement =
String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName);
- return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
+ return
dbStatementExecutor.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
try {
@@ -447,7 +418,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
protected int attemptLeaseIfExistingRow(String acquireLeaseStatement,
DagActionStore.DagAction flowAction,
boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp
dbEventTimestamp,
Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
- return withPreparedStatement(acquireLeaseStatement,
+ return dbStatementExecutor.withPreparedStatement(acquireLeaseStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction,
needEventTimeCheck, needLeaseAcquisition,
dbEventTimestamp, dbLeaseAcquisitionTimestamp);
@@ -460,7 +431,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* was successful or not.
*/
protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction)
throws IOException {
- return withPreparedStatement(thisTableSelectAfterInsertStatement,
+ return
dbStatementExecutor.withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement,
flowAction);
ResultSet resultSet = selectStatement.executeQuery();
@@ -596,7 +567,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
String flowGroup = flowAction.getFlowGroup();
String flowName = flowAction.getFlowName();
DagActionStore.FlowActionType flowActionType =
flowAction.getFlowActionType();
- return
withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
leaseArbiterTableName),
+ return
dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
leaseArbiterTableName),
updateStatement -> {
int i = 0;
updateStatement.setString(++i, flowGroup);
@@ -621,25 +592,6 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}, true);
}
- /** Abstracts recurring pattern around resource management and exception
re-mapping. */
- protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
- throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(sql)) {
- T result = f.apply(statement);
- if (shouldCommit) {
- connection.commit();
- }
- statement.close();
- return result;
- } catch (SQLException e) {
- log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} "
- + "Exception is {}", ((HikariDataSource)
this.dataSource).getConnectionTestQuery(), e);
- throw new IOException(e);
- }
- }
-
-
/**
* DTO for arbiter's current lease state for a FlowActionEvent
*/
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index 4f639e04a..894d0a300 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import com.google.inject.Inject;
import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
@@ -38,6 +39,8 @@ import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.DBStatementExecutor;
+
@Slf4j
public class MysqlDagActionStore implements DagActionStore {
@@ -45,7 +48,10 @@ public class MysqlDagActionStore implements DagActionStore {
public static final String CONFIG_PREFIX = "MysqlDagActionStore";
protected final DataSource dataSource;
+ private final DBStatementExecutor dbStatementExecutor;
private final String tableName;
+ private final long retentionPeriodSeconds;
+ private String thisTableRetentionStatement;
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND
dag_action = ?)";
protected static final String INSERT_STATEMENT = "INSERT INTO %s
(flow_group, flow_name, flow_execution_id, dag_action) "
@@ -58,6 +64,8 @@ public class MysqlDagActionStore implements DagActionStore {
+ "flow_execution_id varchar(" +
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
+ "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT
CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
+ // Deletes rows older than retention time period (in seconds) to prevent
this table from growing unbounded.
+ private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %s SECOND)";
private final int getDagActionMaxRetries;
@@ -71,7 +79,8 @@ public class MysqlDagActionStore implements DagActionStore {
this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
this.getDagActionMaxRetries = ConfigUtils.getInt(config,
ConfigurationKeys.MYSQL_GET_MAX_RETRIES,
ConfigurationKeys.DEFAULT_MYSQL_GET_MAX_RETRIES);
-
+ this.retentionPeriodSeconds = ConfigUtils.getLong(config,
ConfigurationKeys.MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY,
+
ConfigurationKeys.DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
try (Connection connection = dataSource.getConnection();
@@ -81,116 +90,108 @@ public class MysqlDagActionStore implements
DagActionStore {
} catch (SQLException e) {
throw new IOException("Failure creation table " + tableName, e);
}
+ this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
+ this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
+ // Periodically deletes all rows in the table last_modified before the
retention period defined by config.
+
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6, TimeUnit.HOURS);
}
@Override
public boolean exists(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) throws IOException,
SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement existStatement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
existStatement.setString(++i, flowActionType.toString());
- rs = existStatement.executeQuery();
- rs.next();
- return rs.getBoolean(1);
- } catch (SQLException e) {
- throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
- new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
+ ResultSet rs = null;
+ try {
+ rs = existStatement.executeQuery();
+ rs.next();
+ return rs.getBoolean(1);
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
}
- }
+ }, true);
}
@Override
public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType)
throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement insertStatement =
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
+ dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
insertStatement.setString(++i, flowActionType.toString());
- insertStatement.executeUpdate();
- connection.commit();
+ return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
- }
+ }}, true);
}
@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement deleteStatement =
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
+ try {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
deleteStatement.setString(++i, dagAction.getFlowExecutionId());
deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
- connection.commit();
return result != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
- }
+ }}, true);
}
// TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff
exponentialBackoff)
throws IOException, SQLException {
- ResultSet rs = null;
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getStatement =
connection.prepareStatement(String.format(GET_STATEMENT, tableName))) {
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
int i = 0;
getStatement.setString(++i, flowGroup);
getStatement.setString(++i, flowName);
getStatement.setString(++i, flowExecutionId);
getStatement.setString(++i, flowActionType.toString());
- rs = getStatement.executeQuery();
- if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
- } else {
- if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+ try (ResultSet rs = getStatement.executeQuery()) {
+ if (rs.next()) {
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
+ } else if (exponentialBackoff.awaitNextRetryIfAvailable()) {
return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
} else {
log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
flowActionType, flowGroup, flowName, flowExecutionId));
return null;
}
+ } catch (SQLException | InterruptedException e) {
+ throw new IOException(String.format("Failure get %s from table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId,
flowActionType), tableName), e);
}
- } catch (SQLException | InterruptedException e) {
- throw new IOException(String.format("Failure get %s from table %s", new
DagAction(flowGroup, flowName, flowExecutionId,
- flowActionType), tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
- }
- }
+ }, true);
}
@Override
public Collection<DagAction> getDagActions() throws IOException {
- HashSet<DagAction> result = new HashSet<>();
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement getAllStatement =
connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName));
- ResultSet rs = getAllStatement.executeQuery()) {
- while (rs.next()) {
- result.add(
- new DagAction(rs.getString(1), rs.getString(2), rs.getString(3),
FlowActionType.valueOf(rs.getString(4))));
- }
- if (rs != null) {
- rs.close();
+ return
dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT,
tableName), getAllStatement -> {
+ HashSet<DagAction> result = new HashSet<>();
+ try (ResultSet rs = getAllStatement.executeQuery()) {
+ while (rs.next()) {
+ result.add(new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4))));
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure get dag actions from
table %s ", tableName), e);
}
- return result;
- } catch (SQLException e) {
- throw new IOException(String.format("Failure get dag actions from table
%s ", tableName), e);
- }
+ }, true);
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index 713c2b9d8..595ba89d7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -352,6 +352,7 @@ public class MysqlBaseSpecStore extends
InstrumentedSpecStore {
return Optional.of(this.specStoreURI);
}
+ // TODO: migrate this class to use common util {@link DBStatementExecutor}
/** Abstracts recurring pattern around resource management and exception
re-mapping. */
protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws
IOException {
try (Connection connection = this.dataSource.getConnection();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java
new file mode 100644
index 000000000..1f554779c
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+
+
+/**
+ * Many database stores require common functionality that can be stored in a
utility class. The functionality
+ * includes executing prepared statements on a data source object and SQL
queries at fixed intervals.
+ * The caller of the class MUST maintain ownership of the {@link DataSource}
and close this instance when the
+ * {@link DataSource} is about to be closed well. Both are to be done only
once this instance will no longer be used.
+ */
+public class DBStatementExecutor implements Closeable {
+ private final DataSource dataSource;
+ private final Logger log;
+ private final ArrayList<ScheduledThreadPoolExecutor> scheduledExecutors;
+
+ public DBStatementExecutor(DataSource dataSource, Logger log) {
+ this.dataSource = dataSource;
+ this.log = log;
+ this.scheduledExecutors = new ArrayList<>();
+ }
+
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ public interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ /** Abstracts recurring pattern around resource management and exception
re-mapping. */
+ public <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
+ throws IOException {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(sql)) {
+ T result = f.apply(statement);
+ if (shouldCommit) {
+ connection.commit();
+ }
+ statement.close();
+ return result;
+ } catch (SQLException e) {
+ log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} "
+ + "Exception is {}", ((HikariDataSource)
dataSource).getConnectionTestQuery(), e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Repeats execution of a SQL command at a fixed interval while the service
is running. The first execution of the
+ * command is immediate.
+ * @param sqlCommand SQL string
+ * @param interval frequency with which command will run
+ * @param timeUnit unit of time for interval
+ */
+ public void repeatSqlCommandExecutionAtInterval(String sqlCommand, long
interval, TimeUnit timeUnit) {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ Runnable task = () -> {
+ try {
+ withPreparedStatement(sqlCommand,
+ preparedStatement -> {
+ int numRowsAffected = preparedStatement.executeUpdate();
+ if (numRowsAffected != 0) {
+ log.info("{} rows affected by SQL command: {}",
numRowsAffected, sqlCommand);
+ }
+ return numRowsAffected;
+ }, true);
+ } catch (IOException e) {
+ log.error("Failed to execute SQL command: {}", sqlCommand, e);
+ }
+ };
+ executor.scheduleAtFixedRate(task, 0, interval, timeUnit);
+ this.scheduledExecutors.add(executor);
+ }
+
+ /**
+ * Call before closing the data source object associated with this instance
to also shut down any executors expecting
+ * to be run on the data source.
+ */
+ @Override
+ public void close() {
+ for (ScheduledThreadPoolExecutor executor : this.scheduledExecutors) {
+ executor.shutdownNow();
+ }
+ }
+}