This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ad104eecaca9cb0189712a6a9044a92bbbddc931 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Aug 19 21:20:20 2021 +0100 [FLINK-23776][docs] Clarify JDBC XA sink usage --- docs/content/docs/connectors/datastream/jdbc.md | 21 +++++++++++++++++---- .../flink/connector/jdbc/xa/JdbcXaSinkFunction.java | 4 ---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md index db0a784..76da068 100644 --- a/docs/content/docs/connectors/datastream/jdbc.md +++ b/docs/content/docs/connectors/datastream/jdbc.md @@ -145,15 +145,14 @@ public class JdbcSinkExample { Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA [standard](https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf). +Most drivers support XA if the database also supports XA (so the driver is usually the same). To use it, create a sink using `exactlyOnceSink()` method as above and additionally provide: - {{< javadoc name="exactly-once options" file="org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html" >}} - {{< javadoc name="execution options" file="org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}} - [XA DataSource](https://docs.oracle.com/javase/8/docs/api/javax/sql/XADataSource.html) Supplier -**ATTENTION!** Currently `JdbcSink.exactlyOnceSink` can ensure exactly once semantics -with `JdbcExecutionOptions.maxRetries == 0`; otherwise, duplicated results maybe produced. - +For example: ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env @@ -180,8 +179,22 @@ env }); env.execute(); ``` -Postgres XADataSource Example: +**NOTE:** Some databases only allow a single XA transaction per connection (e.g. PostgreSQL, MySQL). +In such cases, please use the following API to construct `JdbcExactlyOnceOptions`: ```java +JdbcExactlyOnceOptions.builder() +.withTransactionPerConnection(true) +.build() +``` +This will make Flink use a separate connection for every XA transaction. This may require adjusting connection limits. +For PostgreSQL and MySQL, this can be done by increasing `max_connections`. + +Furthermore, XA needs to be enabled and/or configured in some databases. +For PostgreSQL, you should set `max_prepared_transactions` to some value greater than zero. +For MySQL v8+, you should grant `XA_RECOVER_ADMIN` to Flink DB user. + +**ATTENTION:** Currently, `JdbcSink.exactlyOnceSink` can ensure exactly once semantics +with `JdbcExecutionOptions.maxRetries == 0`; otherwise, duplicated results maybe produced. ### `XADataSource` examples PostgreSQL `XADataSource` example: diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java index 8d38028..31ca668 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java @@ -124,10 +124,6 @@ import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; * </tbody> * </table> * - * <p>Attention: JdbcXaSinkFunction does not support exactly-once mode with MySQL or other databases - * that do not support multiple XA transaction per connection. We will improve the support in - * FLINK-22239. - * * @since 1.13 */ @Internal
