This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e43674  [FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink
7e43674 is described below

commit 7e43674abc2281af51ad83b2e3972f7ffb3d2c7b
Author: Maciej BryƄski <[email protected]>
AuthorDate: Wed May 19 11:19:18 2021 +0200

    [FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink
    
    Co-authored-by: Alexander Fedulov 
<[email protected]>
---
 .../src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java  | 4 +++-
 .../apache/flink/connector/jdbc/internal/JdbcOutputFormat.java   | 5 +++++
 .../org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java   | 9 +++++++++
 .../flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java      | 2 +-
 .../org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java   | 5 ++++-
 5 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
index cf0c6cf..f4761e5 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
@@ -90,7 +90,9 @@ public class JdbcSink {
      * @param <T> type of data in {@link
      *     org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
StreamRecord}.
      * @param executionOptions parameters of execution, such as batch size and 
maximum retries
-     * @param exactlyOnceOptions exactly-once options
+     * @param exactlyOnceOptions exactly-once options. Note: maxRetries 
setting must be strictly set
+     *     to 0 for the created sink to work properly and not to produce 
duplicates. See issue
+     *     FLINK-22311 for details.
      * @param dataSourceSupplier supplies the {@link XADataSource}
      */
     public static <T> SinkFunction<T> exactlyOnceSink(
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
index ed194bdf..a869f6c 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
@@ -406,6 +406,11 @@ public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends 
JdbcBatchStatementExe
                         : connectionProvider.getConnection());
     }
 
+    /** Returns configured {@code JdbcExecutionOptions}. */
+    public JdbcExecutionOptions getExecutionOptions() {
+        return executionOptions;
+    }
+
     @VisibleForTesting
     public Connection getConnection() {
         return connectionProvider.getConnection();
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
index 31ca668..36c5f01 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
@@ -157,6 +157,9 @@ public class JdbcXaSinkFunction<T> extends 
AbstractRichFunction
      *
      * <p>All parameters must be {@link java.io.Serializable serializable}.
      *
+     * <p>Note: {@link JdbcExecutionOptions} maxRetries setting must be 
strictly set to 0 for this
+     * sink to work properly and not to produce duplicates. See issue 
FLINK-22311 for details.
+     *
      * @param xaFacade {@link XaFacade} to manage XA transactions
      */
     public JdbcXaSinkFunction(
@@ -196,6 +199,12 @@ public class JdbcXaSinkFunction<T> extends 
AbstractRichFunction
             XaSinkStateHandler stateHandler,
             JdbcExactlyOnceOptions options,
             XaGroupOps xaGroupOps) {
+
+        Preconditions.checkArgument(
+                outputFormat.getExecutionOptions().getMaxRetries() == 0,
+                "JDBC XA sink requires maxRetries equal to 0, otherwise it 
could "
+                        + "cause duplicates. See issue FLINK-22311 for 
details.");
+
         this.xaFacade = Preconditions.checkNotNull(xaFacade);
         this.xidGenerator = Preconditions.checkNotNull(xidGenerator);
         this.outputFormat = Preconditions.checkNotNull(outputFormat);
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index ffcc2c1..618cd41 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -214,7 +214,7 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
                         JdbcSink.exactlyOnceSink(
                                 String.format(INSERT_TEMPLATE, INPUT_TABLE),
                                 JdbcITCase.TEST_ENTRY_JDBC_STATEMENT_BUILDER,
-                                JdbcExecutionOptions.builder().build(),
+                                
JdbcExecutionOptions.builder().withMaxRetries(0).build(),
                                 JdbcExactlyOnceOptions.builder()
                                         .withTransactionPerConnection(true)
                                         .build(),
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index 5b5aa02..efaf4b2 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -151,7 +151,10 @@ public abstract class JdbcXaSinkTestBase extends 
JdbcTestBase {
         JdbcOutputFormat<TestEntry, TestEntry, 
JdbcBatchStatementExecutor<TestEntry>> format =
                 new JdbcOutputFormat<>(
                         xaFacade,
-                        
JdbcExecutionOptions.builder().withBatchIntervalMs(batchInterval).build(),
+                        JdbcExecutionOptions.builder()
+                                .withBatchIntervalMs(batchInterval)
+                                .withMaxRetries(0)
+                                .build(),
                         ctx ->
                                 JdbcBatchStatementExecutor.simple(
                                         String.format(INSERT_TEMPLATE, 
INPUT_TABLE),

Reply via email to