This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 898419e [FLINK-23437][jdbc] Add an option to (dis)allow XA
transaction multiplexing
898419e is described below
commit 898419ebedc0a4c8005347c848e2b004187d0672
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jul 28 07:55:06 2021 +0200
[FLINK-23437][jdbc] Add an option to (dis)allow XA transaction multiplexing
---
.../connector/jdbc/JdbcExactlyOnceOptions.java | 37 ++++++++++++++++++++--
.../org/apache/flink/connector/jdbc/JdbcSink.java | 4 ++-
.../apache/flink/connector/jdbc/xa/XaFacade.java | 8 +++--
.../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 4 ++-
4 files changed, 47 insertions(+), 6 deletions(-)
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
index 7ba4eb5..c17ed17 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
@@ -53,21 +53,25 @@ public class JdbcExactlyOnceOptions implements Serializable
{
private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = true;
private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false;
+ public static final boolean DEFAULT_TRANSACTION_PER_CONNECTION = false;
private final boolean discoverAndRollbackOnRecovery;
private final int maxCommitAttempts;
private final boolean allowOutOfOrderCommits;
private final Integer timeoutSec;
+ private final boolean transactionPerConnection;
private JdbcExactlyOnceOptions(
boolean discoverAndRollbackOnRecovery,
int maxCommitAttempts,
boolean allowOutOfOrderCommits,
- Optional<Integer> timeoutSec) {
+ Optional<Integer> timeoutSec,
+ boolean transactionPerConnection) {
this.discoverAndRollbackOnRecovery = discoverAndRollbackOnRecovery;
this.maxCommitAttempts = maxCommitAttempts;
this.allowOutOfOrderCommits = allowOutOfOrderCommits;
this.timeoutSec = timeoutSec.orElse(null);
+ this.transactionPerConnection = transactionPerConnection;
Preconditions.checkArgument(this.maxCommitAttempts > 0,
"maxCommitAttempts should be > 0");
}
@@ -91,6 +95,10 @@ public class JdbcExactlyOnceOptions implements Serializable {
return timeoutSec;
}
+ public boolean isTransactionPerConnection() {
+ return transactionPerConnection;
+ }
+
public static JDBCExactlyOnceOptionsBuilder builder() {
return new JDBCExactlyOnceOptionsBuilder();
}
@@ -101,6 +109,7 @@ public class JdbcExactlyOnceOptions implements Serializable
{
private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
private boolean allowOutOfOrderCommits =
DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS;
private Optional<Integer> timeoutSec = Optional.empty();
+ private boolean transactionPerConnection =
DEFAULT_TRANSACTION_PER_CONNECTION;
/**
* Toggle discovery and rollback of prepared transactions upon
recovery to prevent new
@@ -138,9 +147,33 @@ public class JdbcExactlyOnceOptions implements
Serializable {
return this;
}
+ /**
+ * Set whether the same connection can be used for multiple XA
transactions. A transaction
+ * is prepared each time a checkpoint is performed; it is committed
once the checkpoint is
+ * confirmed. There can be multiple un-confirmed checkpoints and
therefore multiple prepared
+ * transactions.
+ *
+ * <p>Some databases support this natively (e.g. Oracle); while others
only allow a single
+ * XA transaction per connection (e.g. MySQL, PostgreSQL).
+ *
+ * <p>If enabled, each transaction uses a separate connection from a
pool. The database
+ * limit of open connections might need to be adjusted.
+ *
+ * <p>Disabled by default.
+ */
+ public JDBCExactlyOnceOptionsBuilder withTransactionPerConnection(
+ boolean transactionPerConnection) {
+ this.transactionPerConnection = transactionPerConnection;
+ return this;
+ }
+
public JdbcExactlyOnceOptions build() {
return new JdbcExactlyOnceOptions(
- recoveredAndRollback, maxCommitAttempts,
allowOutOfOrderCommits, timeoutSec);
+ recoveredAndRollback,
+ maxCommitAttempts,
+ allowOutOfOrderCommits,
+ timeoutSec,
+ transactionPerConnection);
}
}
}
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
index bc12b42..1b2c903 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
@@ -108,7 +108,9 @@ public class JdbcSink {
sql,
statementBuilder,
XaFacade.fromXaDataSourceSupplier(
- dataSourceSupplier,
exactlyOnceOptions.getTimeoutSec()),
+ dataSourceSupplier,
+ exactlyOnceOptions.getTimeoutSec(),
+ exactlyOnceOptions.isTransactionPerConnection()),
executionOptions,
exactlyOnceOptions);
}
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
index 4614645..9857aff 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacade.java
@@ -51,8 +51,12 @@ public interface XaFacade extends JdbcConnectionProvider,
Serializable, AutoClos
/** @return a non-serializable instance. */
static XaFacade fromXaDataSourceSupplier(
- Supplier<XADataSource> dataSourceSupplier, Integer timeoutSec) {
- return new XaFacadePoolingImpl(() -> new
XaFacadeImpl(dataSourceSupplier, timeoutSec));
+ Supplier<XADataSource> dataSourceSupplier,
+ Integer timeoutSec,
+ boolean transactionPerConnection) {
+ return transactionPerConnection
+ ? new XaFacadePoolingImpl(() -> new
XaFacadeImpl(dataSourceSupplier, timeoutSec))
+ : new XaFacadeImpl(dataSourceSupplier, timeoutSec);
}
void open() throws Exception;
diff --git
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index a49ab59..0a8cc8c 100644
---
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -195,7 +195,9 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
String.format(INSERT_TEMPLATE, INPUT_TABLE),
JdbcITCase.TEST_ENTRY_JDBC_STATEMENT_BUILDER,
JdbcExecutionOptions.builder().build(),
- JdbcExactlyOnceOptions.defaults(),
+ JdbcExactlyOnceOptions.builder()
+ .withTransactionPerConnection(true)
+ .build(),
this.dbEnv.getDataSourceSupplier()));
env.execute();