phet commented on code in PR #3812: URL: https://github.com/apache/gobblin/pull/3812#discussion_r1376767399
########## gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util; + +import com.zaxxer.hikari.HikariDataSource; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.sql.DataSource; +import org.slf4j.Logger; + + +/** + * MySQL based implementations of stores require common functionality that can be stored in a utility class. The + * functionality includes executing prepared statements on a data source object and executing SQL queries at fixed + * intervals. The instantiater of the class should provide the data source used within this utility. + */ +public class MySQLStoreUtils { Review Comment: naming-wise, what's mysql-specific about the impl? wouldn't it work for any DB? (is it merely for that fancy `HikariDataSource` logging trick) if that's all, let's not codify mysql in the name... ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java: ########## @@ -81,116 +90,122 @@ public MysqlDagActionStore(Config config) throws IOException { } catch (SQLException e) { throw new IOException("Failure creation table " + tableName, e); } + this.mySQLStoreUtils = new MySQLStoreUtils(this.dataSource, log); + this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, this.tableName, retentionPeriodSeconds); + // Periodically deletes all rows in the table last_modified before the retention period defined by config. + mySQLStoreUtils.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement, 6, TimeUnit.HOURS); } @Override public boolean exists(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException, SQLException { - ResultSet rs = null; - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement existStatement = connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) { + return mySQLStoreUtils.withPreparedStatement(String.format(EXISTS_STATEMENT, tableName), existStatement -> { int i = 0; existStatement.setString(++i, flowGroup); existStatement.setString(++i, flowName); existStatement.setString(++i, flowExecutionId); existStatement.setString(++i, flowActionType.toString()); - rs = existStatement.executeQuery(); - rs.next(); - return rs.getBoolean(1); - } catch (SQLException e) { - throw new IOException(String.format("Failure checking existence of DagAction: %s in table %s", - new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); - } finally { - if (rs != null) { - rs.close(); + ResultSet rs = null; + try { + rs = existStatement.executeQuery(); + rs.next(); + return rs.getBoolean(1); + } catch (SQLException e) { + throw new IOException(String.format("Failure checking existence of DagAction: %s in table %s", + new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); + } finally { + if (rs != null) { + rs.close(); + } } - } + }, true); } @Override public void addDagAction(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException { - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement insertStatement = connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) { + mySQLStoreUtils.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { + try { int i = 0; insertStatement.setString(++i, flowGroup); insertStatement.setString(++i, flowName); insertStatement.setString(++i, flowExecutionId); insertStatement.setString(++i, flowActionType.toString()); - insertStatement.executeUpdate(); - connection.commit(); + return insertStatement.executeUpdate(); } catch (SQLException e) { throw new IOException(String.format("Failure adding action for DagAction: %s in table %s", new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); - } + }}, true); } @Override public boolean deleteDagAction(DagAction dagAction) throws IOException { - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement deleteStatement = connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) { + return mySQLStoreUtils.withPreparedStatement(String.format(DELETE_STATEMENT, tableName), deleteStatement -> { + try { int i = 0; deleteStatement.setString(++i, dagAction.getFlowGroup()); deleteStatement.setString(++i, dagAction.getFlowName()); deleteStatement.setString(++i, dagAction.getFlowExecutionId()); deleteStatement.setString(++i, dagAction.getFlowActionType().toString()); int result = deleteStatement.executeUpdate(); - connection.commit(); return result != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting action for DagAction: %s in table %s", dagAction, tableName), e); - } + }}, true); } // TODO: later change this to getDagActions relating to a particular flow execution if it makes sense private DagAction getDagActionWithRetry(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff exponentialBackoff) throws IOException, SQLException { - ResultSet rs = null; - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement getStatement = connection.prepareStatement(String.format(GET_STATEMENT, tableName))) { - int i = 0; - getStatement.setString(++i, flowGroup); - getStatement.setString(++i, flowName); - getStatement.setString(++i, flowExecutionId); - getStatement.setString(++i, flowActionType.toString()); - rs = getStatement.executeQuery(); - if (rs.next()) { - return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4))); - } else { - if (exponentialBackoff.awaitNextRetryIfAvailable()) { - return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, flowActionType, exponentialBackoff); + return mySQLStoreUtils.withPreparedStatement(String.format(GET_STATEMENT, tableName), getStatement -> { + ResultSet rs = null; + try { + int i = 0; + getStatement.setString(++i, flowGroup); + getStatement.setString(++i, flowName); + getStatement.setString(++i, flowExecutionId); + getStatement.setString(++i, flowActionType.toString()); + rs = getStatement.executeQuery(); + if (rs.next()) { + return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4))); } else { - log.warn(String.format("Can not find dag action: %s with flowGroup: %s, flowName: %s, flowExecutionId: %s", - flowActionType, flowGroup, flowName, flowExecutionId)); - return null; + if (exponentialBackoff.awaitNextRetryIfAvailable()) { + return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, flowActionType, exponentialBackoff); + } else { + log.warn(String.format("Can not find dag action: %s with flowGroup: %s, flowName: %s, flowExecutionId: %s", + flowActionType, flowGroup, flowName, flowExecutionId)); + return null; + } + } + } catch (SQLException | InterruptedException e) { + throw new IOException(String.format("Failure get %s from table %s", + new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); + } finally { + if (rs != null) { + rs.close(); } } - } catch (SQLException | InterruptedException e) { - throw new IOException(String.format("Failure get %s from table %s", new DagAction(flowGroup, flowName, flowExecutionId, - flowActionType), tableName), e); - } finally { - if (rs != null) { - rs.close(); - } - } + }, true); } @Override public Collection<DagAction> getDagActions() throws IOException { - HashSet<DagAction> result = new HashSet<>(); - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement getAllStatement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName)); - ResultSet rs = getAllStatement.executeQuery()) { - while (rs.next()) { - result.add( - new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4)))); - } - if (rs != null) { - rs.close(); + return mySQLStoreUtils.withPreparedStatement(String.format(GET_ALL_STATEMENT, tableName), getAllStatement -> { + ResultSet rs = null; + try { + HashSet<DagAction> result = new HashSet<>(); + rs = getAllStatement.executeQuery(); + while (rs.next()) { + result.add(new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4)))); + } + return result; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag actions from table %s ", tableName), e); Review Comment: `withPreparedStatement` will already map `SQLException` to `IOException`... do you do this here just for a specific error message? ########## gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java: ########## @@ -0,0 +1,70 @@ +package org.apache.gobblin.util; + +import com.zaxxer.hikari.HikariDataSource; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.sql.DataSource; +import org.slf4j.Logger; + + +/** + * MySQL based implementations of stores require common functionality that can be stored in a utility class. The + * functionality includes executing prepared statements on a data source object and executing SQL queries at fixed + * intervals. + */ +public class MySQLStoreUtils { + private final DataSource dataSource; Review Comment: would be clearer to say something like "MUST maintain ownership of the {@link DataSource} and arrange for it to be closed, but only once this instance will no longer be used" ... and with that in mind, how would the repeating SQL mesh with that? should this instance hold on to what it schedules, and provide a `.close()` method that would unschedule them all? if so, the advised shutdown protocol might be: ``` storeUtils.close(); dataSource.close(); ``` ? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java: ########## @@ -80,13 +77,9 @@ */ @Slf4j public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter { - /** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */ - @FunctionalInterface - protected interface CheckedFunction<T, R> { - R apply(T t) throws IOException, SQLException; - } protected final DataSource dataSource; + private final MySQLStoreUtils mySQLStoreUtils; Review Comment: how about `DBStatementExecutor`? ########## gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java: ########## @@ -95,6 +95,10 @@ public class ConfigurationKeys { public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500; public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days"; public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365; + // Mysql Dag Action Store configuration + public static final String MYSQL_DAG_ACTION_STORE_PREFIX = "MysqlDagActionStore."; + public static final String MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = MYSQL_DAG_ACTION_STORE_PREFIX + ".retentionPeriodSec"; + public static final long DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 60; // (3 days in seconds) Review Comment: you resolved this, but I still see `SEC_KEY`... do you plan to leave as-is? ########## gobblin-runtime/src/main/java/org/apache/gobblin/util/MySQLStoreUtils.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util; + +import com.zaxxer.hikari.HikariDataSource; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.sql.DataSource; +import org.slf4j.Logger; + + +/** + * MySQL based implementations of stores require common functionality that can be stored in a utility class. The + * functionality includes executing prepared statements on a data source object and executing SQL queries at fixed + * intervals. The instantiater of the class should provide the data source used within this utility. + */ +public class MySQLStoreUtils { + private final DataSource dataSource; + private final Logger log; + + public MySQLStoreUtils(DataSource dataSource, Logger log) { + this.dataSource = dataSource; + this.log = log; + } + + /** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */ + @FunctionalInterface + public interface CheckedFunction<T, R> { + R apply(T t) throws IOException, SQLException; + } + + /** Abstracts recurring pattern around resource management and exception re-mapping. */ + public <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) Review Comment: what's your thinking on other classes with their own `withPreparedStatement`, such as `MysqlBaseSpecStore`--do they deserve a TODO comment because you recommend to migrate them to use this impl-in-common? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
