This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e43674 [FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink
7e43674 is described below
commit 7e43674abc2281af51ad83b2e3972f7ffb3d2c7b
Author: Maciej BryĆski <[email protected]>
AuthorDate: Wed May 19 11:19:18 2021 +0200
[FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink
Co-authored-by: Alexander Fedulov
<[email protected]>
---
.../src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java | 4 +++-
.../apache/flink/connector/jdbc/internal/JdbcOutputFormat.java | 5 +++++
.../org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java | 9 +++++++++
.../flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 2 +-
.../org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java | 5 ++++-
5 files changed, 22 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
index cf0c6cf..f4761e5 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
@@ -90,7 +90,9 @@ public class JdbcSink {
* @param <T> type of data in {@link
* org.apache.flink.streaming.runtime.streamrecord.StreamRecord
StreamRecord}.
* @param executionOptions parameters of execution, such as batch size and
maximum retries
- * @param exactlyOnceOptions exactly-once options
+ * @param exactlyOnceOptions exactly-once options. Note: maxRetries
setting must be strictly set
+ * to 0 for the created sink to work properly and not to produce
duplicates. See issue
+ * FLINK-22311 for details.
* @param dataSourceSupplier supplies the {@link XADataSource}
*/
public static <T> SinkFunction<T> exactlyOnceSink(
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
index ed194bdf..a869f6c 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
@@ -406,6 +406,11 @@ public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends
JdbcBatchStatementExe
: connectionProvider.getConnection());
}
+ /** Returns configured {@code JdbcExecutionOptions}. */
+ public JdbcExecutionOptions getExecutionOptions() {
+ return executionOptions;
+ }
+
@VisibleForTesting
public Connection getConnection() {
return connectionProvider.getConnection();
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
index 31ca668..36c5f01 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
@@ -157,6 +157,9 @@ public class JdbcXaSinkFunction<T> extends
AbstractRichFunction
*
* <p>All parameters must be {@link java.io.Serializable serializable}.
*
+ * <p>Note: {@link JdbcExecutionOptions} maxRetries setting must be
strictly set to 0 for this
+ * sink to work properly and not to produce duplicates. See issue
FLINK-22311 for details.
+ *
* @param xaFacade {@link XaFacade} to manage XA transactions
*/
public JdbcXaSinkFunction(
@@ -196,6 +199,12 @@ public class JdbcXaSinkFunction<T> extends
AbstractRichFunction
XaSinkStateHandler stateHandler,
JdbcExactlyOnceOptions options,
XaGroupOps xaGroupOps) {
+
+ Preconditions.checkArgument(
+ outputFormat.getExecutionOptions().getMaxRetries() == 0,
+ "JDBC XA sink requires maxRetries equal to 0, otherwise it
could "
+ + "cause duplicates. See issue FLINK-22311 for
details.");
+
this.xaFacade = Preconditions.checkNotNull(xaFacade);
this.xidGenerator = Preconditions.checkNotNull(xidGenerator);
this.outputFormat = Preconditions.checkNotNull(outputFormat);
diff --git
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index ffcc2c1..618cd41 100644
---
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -214,7 +214,7 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
JdbcSink.exactlyOnceSink(
String.format(INSERT_TEMPLATE, INPUT_TABLE),
JdbcITCase.TEST_ENTRY_JDBC_STATEMENT_BUILDER,
- JdbcExecutionOptions.builder().build(),
+
JdbcExecutionOptions.builder().withMaxRetries(0).build(),
JdbcExactlyOnceOptions.builder()
.withTransactionPerConnection(true)
.build(),
diff --git
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index 5b5aa02..efaf4b2 100644
---
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -151,7 +151,10 @@ public abstract class JdbcXaSinkTestBase extends
JdbcTestBase {
JdbcOutputFormat<TestEntry, TestEntry,
JdbcBatchStatementExecutor<TestEntry>> format =
new JdbcOutputFormat<>(
xaFacade,
-
JdbcExecutionOptions.builder().withBatchIntervalMs(batchInterval).build(),
+ JdbcExecutionOptions.builder()
+ .withBatchIntervalMs(batchInterval)
+ .withMaxRetries(0)
+ .build(),
ctx ->
JdbcBatchStatementExecutor.simple(
String.format(INSERT_TEMPLATE,
INPUT_TABLE),