This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a28590bddf25221c3d1b11b350e7ecec064e6a22 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Apr 15 22:21:18 2021 +0200 [FLINK-22239][jdbc] Pool connections in JDBC XA sink Some databases like PostgreSQL and MySql allow at most one XA transaction per connection. Using new connection for each transaction (and pooling) allows to overcome this limitation. --- .../org/apache/flink/connector/jdbc/JdbcSink.java | 4 +- .../jdbc/internal/JdbcBatchingOutputFormat.java | 13 +- .../connector/jdbc/xa/JdbcXaSinkFunction.java | 20 ++- .../apache/flink/connector/jdbc/xa/XaFacade.java | 13 +- .../flink/connector/jdbc/xa/XaFacadeImpl.java | 20 +-- .../connector/jdbc/xa/XaFacadePoolingImpl.java | 193 +++++++++++++++++++++ .../flink/connector/jdbc/xa/XaGroupOpsImpl.java | 2 +- .../connector/jdbc/xa/JdbcXaFacadeTestHelper.java | 12 +- .../connector/jdbc/xa/h2/H2XaResourceWrapper.java | 26 +-- 9 files changed, 248 insertions(+), 55 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java index 7a0ee41..bc12b42 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java @@ -30,7 +30,6 @@ import org.apache.flink.util.function.SerializableSupplier; import javax.sql.XADataSource; -import java.util.Optional; import java.util.function.Function; /** Facade to create JDBC {@link SinkFunction sinks}. */ @@ -109,8 +108,7 @@ public class JdbcSink { sql, statementBuilder, XaFacade.fromXaDataSourceSupplier( - dataSourceSupplier, - Optional.ofNullable(exactlyOnceOptions.getTimeoutSec())), + dataSourceSupplier, exactlyOnceOptions.getTimeoutSec()), executionOptions, exactlyOnceOptions); } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java index 33be5b7..5bf52bb 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java @@ -41,7 +41,6 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; -import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.concurrent.Executors; @@ -191,9 +190,7 @@ public class JdbcBatchingOutputFormat< } try { if (!connectionProvider.isConnectionValid()) { - jdbcStatementExecutor.closeStatements(); - Connection connection = connectionProvider.reestablishConnection(); - jdbcStatementExecutor.prepareStatements(connection); + updateExecutor(true); } } catch (Exception exception) { LOG.error( @@ -367,4 +364,12 @@ public class JdbcBatchingOutputFormat< static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) { return (st, record) -> setRecordToStatement(st, types, record); } + + public void updateExecutor(boolean reconnect) throws SQLException, ClassNotFoundException { + jdbcStatementExecutor.closeStatements(); + jdbcStatementExecutor.prepareStatements( + reconnect + ? connectionProvider.reestablishConnection() + : connectionProvider.getConnection()); + } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java index 0748278..17ad730 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -220,10 +221,6 @@ public class JdbcXaSinkFunction<T> extends AbstractRichFunction super.open(configuration); xidGenerator.open(); xaFacade.open(); - outputFormat.setRuntimeContext(getRuntimeContext()); - outputFormat.open( - getRuntimeContext().getIndexOfThisSubtask(), - getRuntimeContext().getNumberOfParallelSubtasks()); hangingXids = new LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry()); commitUpToCheckpoint(Optional.empty()); if (options.isDiscoverAndRollbackOnRecovery()) { @@ -234,6 +231,11 @@ public class JdbcXaSinkFunction<T> extends AbstractRichFunction xaGroupOps.recoverAndRollback(); } beginTx(0L); + outputFormat.setRuntimeContext(getRuntimeContext()); + // open format only after starting the transaction so it gets a ready to use connection + outputFormat.open( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks()); } @Override @@ -264,7 +266,7 @@ public class JdbcXaSinkFunction<T> extends AbstractRichFunction if (currentXid != null && xaFacade.isOpen()) { try { LOG.debug("remove current transaction before closing, xid={}", currentXid); - xaFacade.failOrRollback(currentXid); + xaFacade.failAndRollback(currentXid); } catch (Exception e) { LOG.warn("unable to fail/rollback current transaction, xid={}", currentXid, e); } @@ -292,16 +294,22 @@ public class JdbcXaSinkFunction<T> extends AbstractRichFunction "empty XA transaction (skip), xid: {}, checkpoint {}", currentXid, checkpointId); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); } currentXid = null; } /** @param checkpointId to associate with the new transaction. */ - private void beginTx(long checkpointId) { + private void beginTx(long checkpointId) throws Exception { Preconditions.checkState(currentXid == null, "currentXid not null"); currentXid = xidGenerator.generateXid(getRuntimeContext(), checkpointId); hangingXids.offerLast(currentXid); xaFacade.start(currentXid); + if (checkpointId > 0) { + // associate outputFormat with a new connection that might have been opened in start() + outputFormat.updateExecutor(false); + } } private void commitUpToCheckpoint(Optional<Long> checkpointInclusive) { diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java index 1b9b5ab..4614645 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java @@ -27,7 +27,6 @@ import javax.transaction.xa.Xid; import java.io.Serializable; import java.util.Collection; -import java.util.Optional; import java.util.function.Supplier; /** @@ -40,7 +39,7 @@ import java.util.function.Supplier; * <li>{@link #open} * <li>{@link #start} transaction * <li>{@link #getConnection}, write some data - * <li>{@link #endAndPrepare} (or {@link #failOrRollback}) + * <li>{@link #endAndPrepare} (or {@link #failAndRollback}) * <li>{@link #commit} / {@link #rollback} * <li>{@link #close} * </ol> @@ -52,8 +51,8 @@ public interface XaFacade extends JdbcConnectionProvider, Serializable, AutoClos /** @return a non-serializable instance. */ static XaFacade fromXaDataSourceSupplier( - Supplier<XADataSource> dataSourceSupplier, Optional<Integer> timeoutSec) { - return new XaFacadeImpl(dataSourceSupplier, timeoutSec); + Supplier<XADataSource> dataSourceSupplier, Integer timeoutSec) { + return new XaFacadePoolingImpl(() -> new XaFacadeImpl(dataSourceSupplier, timeoutSec)); } void open() throws Exception; @@ -61,10 +60,10 @@ public interface XaFacade extends JdbcConnectionProvider, Serializable, AutoClos boolean isOpen(); /** Start a new transaction. */ - void start(Xid xid) throws TransientXaException; + void start(Xid xid) throws Exception; /** End and then prepare the transaction. Transaction can't be resumed afterwards. */ - void endAndPrepare(Xid xid) throws TransientXaException, EmptyXaTransactionException; + void endAndPrepare(Xid xid) throws Exception; /** * Commit previously prepared transaction. @@ -81,7 +80,7 @@ public interface XaFacade extends JdbcConnectionProvider, Serializable, AutoClos * End transaction as {@link javax.transaction.xa.XAResource#TMFAIL failed}; in case of error, * try to roll it back. */ - void failOrRollback(Xid xid) throws TransientXaException; + void failAndRollback(Xid xid) throws TransientXaException; /** * Note: this can block on some non-MVCC databases if there are ended not prepared transactions. diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java index 52c8ecf..bfe41e6 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.xa; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; @@ -80,14 +81,14 @@ class XaFacadeImpl implements XaFacade { private transient Connection connection; private transient XAConnection xaConnection; - /** @return a non-serializable instance. */ + @VisibleForTesting static XaFacadeImpl fromXaDataSource(XADataSource ds) { - return new XaFacadeImpl(() -> ds, empty()); + return new XaFacadeImpl(() -> ds, null); } - XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Optional<Integer> timeoutSec) { + XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Integer timeoutSec) { this.dataSourceSupplier = Preconditions.checkNotNull(dataSourceSupplier); - this.timeoutSec = timeoutSec.orElse(null); + this.timeoutSec = timeoutSec; } @Override @@ -115,10 +116,6 @@ class XaFacadeImpl implements XaFacade { connection.close(); connection = null; } - if (xaConnection != null) { - xaConnection.close(); - xaConnection = null; - } xaResource = null; } @@ -173,12 +170,15 @@ class XaFacadeImpl implements XaFacade { } @Override - public void failOrRollback(Xid xid) { + public void failAndRollback(Xid xid) { execute( Command.fromRunnable( "end (fail)", xid, - () -> xaResource.end(xid, XAResource.TMFAIL), + () -> { + xaResource.end(xid, XAResource.TMFAIL); + xaResource.rollback(xid); + }, err -> { if (err.errorCode >= XA_RBBASE) { rollback(xid); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadePoolingImpl.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadePoolingImpl.java new file mode 100644 index 0000000..04d4855 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadePoolingImpl.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.transaction.xa.Xid; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.function.Supplier; + +import static org.apache.flink.util.ExceptionUtils.rethrow; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A "pooling" implementation of {@link XaFacade}. Some database implement XA such that one + * connection is limited to a single transaction. As a workaround, this implementation creates a new + * XA resource after each xa_start call is made (and associates it with the xid to commit later). + */ +@Internal +class XaFacadePoolingImpl implements XaFacade { + private static final long serialVersionUID = 1L; + + public interface FacadeSupplier extends Serializable, Supplier<XaFacade> {} + + private static final transient Logger LOG = LoggerFactory.getLogger(XaFacadePoolingImpl.class); + private final FacadeSupplier facadeSupplier; + private transient XaFacade active; + private transient Map<Xid, XaFacade> mappedToXids; + private transient Deque<XaFacade> pooled; + + XaFacadePoolingImpl(FacadeSupplier facadeSupplier) { + this.facadeSupplier = facadeSupplier; + } + + @Override + public void open() throws Exception { + checkState(active == null); + pooled = new LinkedList<>(); + mappedToXids = new HashMap<>(); + } + + @Override + public boolean isOpen() { + return active != null && active.isOpen(); + } + + @Override + public void start(Xid xid) throws Exception { + checkState(active == null); + if (pooled.isEmpty()) { + active = facadeSupplier.get(); + active.open(); + } else { + active = pooled.poll(); + } + active.start(xid); + mappedToXids.put(xid, active); + } + + /** + * Must be called after {@link #start(Xid)} with the same {@link Xid}. + * + * @see XaFacade#endAndPrepare(Xid) + */ + @Override + public void endAndPrepare(Xid xid) throws Exception { + checkState(active == mappedToXids.get(xid)); + try { + active.endAndPrepare(xid); + } finally { + active = null; + } + } + + @Override + public void commit(Xid xid, boolean ignoreUnknown) throws TransientXaException { + runForXid(xid, facade -> facade.commit(xid, ignoreUnknown)); + } + + @Override + public void rollback(Xid xid) throws TransientXaException { + runForXid(xid, facade -> facade.rollback(xid)); + } + + @Override + public void failAndRollback(Xid xid) throws TransientXaException { + runForXid(xid, facade -> facade.failAndRollback(xid)); + } + + @Override + public Collection<Xid> recover() throws TransientXaException { + return peekPooled().recover(); + } + + @Override + public void close() throws Exception { + for (XaFacade facade : mappedToXids.values()) { + facade.close(); + } + for (XaFacade facade : pooled) { + facade.close(); + } + if (active != null && active.isOpen()) { + active.close(); + } + } + + @Nullable + @Override + public Connection getConnection() { + return active.getConnection(); + } + + @Override + public boolean isConnectionValid() throws SQLException { + return active.isConnectionValid(); + } + + @Override + public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException { + return active.getOrEstablishConnection(); + } + + @Override + public void closeConnection() { + active.closeConnection(); + } + + @Override + public Connection reestablishConnection() throws SQLException, ClassNotFoundException { + return active.reestablishConnection(); + } + + // WARN: action MUST leave the facade in IDLE state (i.e. not start/end/prepare any tx) + private void runForXid(Xid xid, ThrowingConsumer<XaFacade, TransientXaException> action) { + XaFacade mapped = mappedToXids.remove(xid); + if (mapped == null) { + // a transaction can be not known during recovery + LOG.debug("No XA resource found associated with XID: {}", xid); + action.accept(peekPooled()); + } else { + LOG.debug("Found mapped XA resource for XID: {} {}", xid, mapped); + try { + action.accept(mapped); + } finally { + pooled.offer(mapped); + } + } + } + + // WARN: the returned facade MUST be left in IDLE state (i.e. not start/end/prepare any tx) + private XaFacade peekPooled() { + XaFacade xaFacade = pooled.peek(); + if (xaFacade == null) { + xaFacade = facadeSupplier.get(); + try { + xaFacade.open(); + } catch (Exception e) { + rethrow(e); + } + pooled.offer(xaFacade); + } + return xaFacade; + } +} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java index 396b165..c9159b8 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java @@ -87,7 +87,7 @@ class XaGroupOpsImpl implements XaGroupOps { } for (Xid x : xids) { try { - xaFacade.failOrRollback(x); + xaFacade.failAndRollback(x); result.succeeded(x); } catch (TransientXaException e) { LOG.info("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage()); diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java index ac9cdd9..46a1141 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java @@ -39,14 +39,24 @@ class JdbcXaFacadeTestHelper implements AutoCloseable { private final XADataSource xaDataSource; private final String table; private final String dbUrl; + private final String user; + private final String pass; private final XaFacade xaFacade; JdbcXaFacadeTestHelper(XADataSource xaDataSource, String dbUrl, String table) throws Exception { + this(xaDataSource, dbUrl, table, "", ""); + } + + JdbcXaFacadeTestHelper( + XADataSource xaDataSource, String dbUrl, String table, String user, String pass) + throws Exception { this.xaDataSource = xaDataSource; this.dbUrl = dbUrl; this.table = table; this.xaFacade = XaFacadeImpl.fromXaDataSource(this.xaDataSource); this.xaFacade.open(); + this.user = user; + this.pass = pass; } void assertPreparedTxCountEquals(int expected) { @@ -77,7 +87,7 @@ class JdbcXaFacadeTestHelper implements AutoCloseable { private List<Integer> getInsertedIds() throws SQLException { List<Integer> dbContents = new ArrayList<>(); - try (Connection connection = DriverManager.getConnection(dbUrl)) { + try (Connection connection = DriverManager.getConnection(dbUrl, user, pass)) { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setReadOnly(true); try (Statement st = connection.createStatement()) { diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java index bdf50d3..0a8572f 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/h2/H2XaResourceWrapper.java @@ -232,17 +232,9 @@ public class H2XaResourceWrapper implements XAResource { } private void finalizeTx(ThrowingRunnable<XAException> runnable) throws XAException { - // underlying implementation sets autocommit on just after commit or rollback - // to prevent the actual change we turn it on the lower level and then revert - withSessionAutocommitOn( - () -> - // underlying implementation nulls out current transaction just after commit - // or rollback - // which prevents it from being prepared afterwards - withCurrentTransaction(runnable)); - } - - private void withCurrentTransaction(ThrowingRunnable<XAException> runnable) throws XAException { + // underlying implementation nulls out current transaction just after commit + // or rollback + // which prevents it from being prepared afterwards Object current = getCurrentTransaction(); try { runnable.run(); @@ -252,16 +244,4 @@ public class H2XaResourceWrapper implements XAResource { setCurrentTransaction(current); } } - - private void withSessionAutocommitOn(ThrowingRunnable<XAException> runnable) - throws XAException { - JdbcConnection conn = getJdbcConnection(); - boolean autoCommit = conn.getSession().getAutoCommit(); - conn.getSession().setAutoCommit(true); - try { - runnable.run(); - } finally { - conn.getSession().setAutoCommit(autoCommit); - } - } }
