This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 898419e  [FLINK-23437][jdbc] Add an option to (dis)allow XA 
transaction multiplexing
898419e is described below

commit 898419ebedc0a4c8005347c848e2b004187d0672
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jul 28 07:55:06 2021 +0200

    [FLINK-23437][jdbc] Add an option to (dis)allow XA transaction multiplexing
---
 .../connector/jdbc/JdbcExactlyOnceOptions.java     | 37 ++++++++++++++++++++--
 .../org/apache/flink/connector/jdbc/JdbcSink.java  |  4 ++-
 .../apache/flink/connector/jdbc/xa/XaFacade.java   |  8 +++--
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java        |  4 ++-
 4 files changed, 47 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
index 7ba4eb5..c17ed17 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
@@ -53,21 +53,25 @@ public class JdbcExactlyOnceOptions implements Serializable 
{
     private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = true;
     private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
     private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false;
+    public static final boolean DEFAULT_TRANSACTION_PER_CONNECTION = false;
 
     private final boolean discoverAndRollbackOnRecovery;
     private final int maxCommitAttempts;
     private final boolean allowOutOfOrderCommits;
     private final Integer timeoutSec;
+    private final boolean transactionPerConnection;
 
     private JdbcExactlyOnceOptions(
             boolean discoverAndRollbackOnRecovery,
             int maxCommitAttempts,
             boolean allowOutOfOrderCommits,
-            Optional<Integer> timeoutSec) {
+            Optional<Integer> timeoutSec,
+            boolean transactionPerConnection) {
         this.discoverAndRollbackOnRecovery = discoverAndRollbackOnRecovery;
         this.maxCommitAttempts = maxCommitAttempts;
         this.allowOutOfOrderCommits = allowOutOfOrderCommits;
         this.timeoutSec = timeoutSec.orElse(null);
+        this.transactionPerConnection = transactionPerConnection;
         Preconditions.checkArgument(this.maxCommitAttempts > 0, 
"maxCommitAttempts should be > 0");
     }
 
@@ -91,6 +95,10 @@ public class JdbcExactlyOnceOptions implements Serializable {
         return timeoutSec;
     }
 
+    public boolean isTransactionPerConnection() {
+        return transactionPerConnection;
+    }
+
     public static JDBCExactlyOnceOptionsBuilder builder() {
         return new JDBCExactlyOnceOptionsBuilder();
     }
@@ -101,6 +109,7 @@ public class JdbcExactlyOnceOptions implements Serializable 
{
         private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
         private boolean allowOutOfOrderCommits = 
DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS;
         private Optional<Integer> timeoutSec = Optional.empty();
+        private boolean transactionPerConnection = 
DEFAULT_TRANSACTION_PER_CONNECTION;
 
         /**
          * Toggle discovery and rollback of prepared transactions upon 
recovery to prevent new
@@ -138,9 +147,33 @@ public class JdbcExactlyOnceOptions implements 
Serializable {
             return this;
         }
 
+        /**
+         * Set whether the same connection can be used for multiple XA 
transactions. A transaction
+         * is prepared each time a checkpoint is performed; it is committed 
once the checkpoint is
+         * confirmed. There can be multiple un-confirmed checkpoints and 
therefore multiple prepared
+         * transactions.
+         *
+         * <p>Some databases support this natively (e.g. Oracle); while others 
only allow a single
+         * XA transaction per connection (e.g. MySQL, PostgreSQL).
+         *
+         * <p>If enabled, each transaction uses a separate connection from a 
pool. The database
+         * limit of open connections might need to be adjusted.
+         *
+         * <p>Disabled by default.
+         */
+        public JDBCExactlyOnceOptionsBuilder withTransactionPerConnection(
+                boolean transactionPerConnection) {
+            this.transactionPerConnection = transactionPerConnection;
+            return this;
+        }
+
         public JdbcExactlyOnceOptions build() {
             return new JdbcExactlyOnceOptions(
-                    recoveredAndRollback, maxCommitAttempts, 
allowOutOfOrderCommits, timeoutSec);
+                    recoveredAndRollback,
+                    maxCommitAttempts,
+                    allowOutOfOrderCommits,
+                    timeoutSec,
+                    transactionPerConnection);
         }
     }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
index bc12b42..1b2c903 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
@@ -108,7 +108,9 @@ public class JdbcSink {
                 sql,
                 statementBuilder,
                 XaFacade.fromXaDataSourceSupplier(
-                        dataSourceSupplier, 
exactlyOnceOptions.getTimeoutSec()),
+                        dataSourceSupplier,
+                        exactlyOnceOptions.getTimeoutSec(),
+                        exactlyOnceOptions.isTransactionPerConnection()),
                 executionOptions,
                 exactlyOnceOptions);
     }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
index 4614645..9857aff 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
@@ -51,8 +51,12 @@ public interface XaFacade extends JdbcConnectionProvider, 
Serializable, AutoClos
 
     /** @return a non-serializable instance. */
     static XaFacade fromXaDataSourceSupplier(
-            Supplier<XADataSource> dataSourceSupplier, Integer timeoutSec) {
-        return new XaFacadePoolingImpl(() -> new 
XaFacadeImpl(dataSourceSupplier, timeoutSec));
+            Supplier<XADataSource> dataSourceSupplier,
+            Integer timeoutSec,
+            boolean transactionPerConnection) {
+        return transactionPerConnection
+                ? new XaFacadePoolingImpl(() -> new 
XaFacadeImpl(dataSourceSupplier, timeoutSec))
+                : new XaFacadeImpl(dataSourceSupplier, timeoutSec);
     }
 
     void open() throws Exception;
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index a49ab59..0a8cc8c 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -195,7 +195,9 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
                                 String.format(INSERT_TEMPLATE, INPUT_TABLE),
                                 JdbcITCase.TEST_ENTRY_JDBC_STATEMENT_BUILDER,
                                 JdbcExecutionOptions.builder().build(),
-                                JdbcExactlyOnceOptions.defaults(),
+                                JdbcExactlyOnceOptions.builder()
+                                        .withTransactionPerConnection(true)
+                                        .build(),
                                 this.dbEnv.getDataSourceSupplier()));
 
         env.execute();

Reply via email to