wuchong commented on a change in pull request #10847:
URL: https://github.com/apache/flink/pull/10847#discussion_r550422198



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidImpl.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A simple {@link Xid} implementation that stores branch and global 
transaction identifiers as byte
+ * arrays.
+ */
+@Internal
+final class XidImpl implements Xid, Serializable {
+
+    private final int formatId;
+    @Nonnull private final byte[] globalTransactionId;
+    @Nonnull private final byte[] branchQualifier;
+
+    XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) {
+        Preconditions.checkArgument(globalTransactionId.length <= 
Xid.MAXGTRIDSIZE);
+        Preconditions.checkArgument(branchQualifier.length <= 
Xid.MAXBQUALSIZE);
+        this.formatId = formatId;
+        this.globalTransactionId = Arrays.copyOf(globalTransactionId, 
globalTransactionId.length);
+        this.branchQualifier = Arrays.copyOf(branchQualifier, 
branchQualifier.length);
+    }
+
+    @Override
+    public int getFormatId() {
+        return formatId;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        return globalTransactionId;
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        return branchQualifier;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof XidImpl)) {
+            return false;
+        }
+        XidImpl xid = (XidImpl) o;
+        return formatId == xid.formatId
+                && Arrays.equals(globalTransactionId, xid.globalTransactionId)
+                && Arrays.equals(branchQualifier, xid.branchQualifier);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(formatId);
+        result = 31 * result + Arrays.hashCode(globalTransactionId);
+        result = 31 * result + Arrays.hashCode(branchQualifier);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return formatId + ":" + bytesToHex(globalTransactionId) + ":" + 
bytesToHex(branchQualifier);

Review comment:
       I think we can use `StringUtils.byteToHexString(bytes)` here to avoid 
duplicate implementation.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidImpl.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A simple {@link Xid} implementation that stores branch and global 
transaction identifiers as byte
+ * arrays.
+ */
+@Internal
+final class XidImpl implements Xid, Serializable {

Review comment:
       Add a `serialVersionUID` to the class?

##########
File path: flink-connectors/flink-connector-jdbc/pom.xml
##########
@@ -92,6 +100,19 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>javax.transaction</groupId>
+                       <artifactId>javax.transaction-api</artifactId>
+                       <version>1.3</version>

Review comment:
       What classes do we depend on this dependency?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/CheckpointAndXidSerializer.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** {@link CheckpointAndXid} serializer. */
+@Internal
+final class CheckpointAndXidSerializer extends 
TypeSerializer<CheckpointAndXid> {
+
+    private static final TypeSerializerSnapshot<CheckpointAndXid> SNAPSHOT =
+            new 
SimpleTypeSerializerSnapshot<CheckpointAndXid>(CheckpointAndXidSerializer::new) 
{
+                private static final int VERSION = 1;
+
+                @Override
+                public void writeSnapshot(DataOutputView out) throws 
IOException {
+                    super.writeSnapshot(out);
+                    out.writeInt(VERSION);
+                }
+
+                @Override
+                public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader classLoader)
+                        throws IOException {
+                    super.readSnapshot(readVersion, in, classLoader);
+                    in.readInt();
+                }
+            };
+
+    private final TypeSerializer<Xid> xidSerializer = new XidSerializer();
+
+    @Override
+    public boolean isImmutableType() {
+        return xidSerializer.isImmutableType();
+    }
+
+    @Override
+    public TypeSerializer<CheckpointAndXid> duplicate() {
+        return this;
+    }
+
+    @Override
+    public CheckpointAndXid createInstance() {
+        return CheckpointAndXid.createRestored(0L, 0, 
xidSerializer.createInstance());
+    }
+
+    @Override
+    public CheckpointAndXid copy(CheckpointAndXid from) {
+        return CheckpointAndXid.createRestored(
+                from.checkpointId, from.attempts, 
xidSerializer.copy(from.xid));
+    }
+
+    @Override
+    public CheckpointAndXid copy(CheckpointAndXid from, CheckpointAndXid 
reuse) {
+        return from;
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public void serialize(CheckpointAndXid record, DataOutputView target) 
throws IOException {
+        target.writeLong(record.checkpointId);
+        target.writeInt(record.attempts);

Review comment:
       Why don't we serialize the restored flag? 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of;
+
+/**
+ * JDBC sink function that uses XA transactions to provide exactly once 
guarantees. That is, if a
+ * checkpoint succeeds then all records emitted during it are committed in the 
database, and rolled
+ * back otherwise.
+ *
+ * <p>Each parallel subtask has it's own transactions, independent from other 
subtasks. Therefore,
+ * consistency is only guaranteed within partitions.
+ *
+ * <p>XA uses a two-phase commit protocol, which solves the consistency 
problem, but leaves the
+ * following issues:
+ *
+ * <ol>
+ *   <li>transactions may be abandoned, holding resources (e.g. locks, 
versions of rows)
+ *   <li>abandoned transactions collide with the new transactions if their IDs 
repeat after recovery
+ *   <li>commit requests may be repeated after job recovery, resulting in 
error responses and job
+ *       failure
+ * </ol>
+ *
+ * The following table summarizes effects of failures during transaction state 
transitions and ways
+ * to mitigate them:
+ *
+ * <table border="1" style="width:100%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:30%;">
+ * <col span="1" style="width:40%;">
+ * <thead>
+ * <tr>
+ * <th>Transition</th>
+ * <th>Methods</th>
+ * <th>What happens if transition lost</th>
+ * <th>Ways to mitigate</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td>none &gt; started, started &gt; ended</td>
+ * <td>open(), snapshotState()</td>
+ * <td>Database eventually discards these transactions</td>
+ * <td><ol>
+ * <li>Use globally unique XIDs</li>
+ * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see 
{@link SemanticXidGenerator}).</li>
+ * </ol></td>
+ * </tr>
+ * <tr>
+ * <td>ended &gt; prepared</td>
+ * <td>snapshotState()</td>
+ * <td>Database keeps these transactions prepared forever ("in-doubt" 
state)</td>
+ * <td>
+ * <ol>
+ * <li>store ended transactions in state; rollback on job recovery (still 
doesn't cover all scenarios)</li>
+ * <li>call xa_recover() and xa_rollback() on job recovery; disabled by 
default in order not to affect transactions of other subtasks and apps</li>
+ * <li>setting transaction timeouts (not supported by most databases)</li>
+ * <li>manual recovery and rollback</li>
+ * </ol>
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>prepared &gt; committed</td>
+ * <td>open(), notifyCheckpointComplete()</td>
+ * <td>
+ * Upon job recovery state contains committed transactions; or JM may 
notifyCheckpointComplete again after recovery.
+ * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA 
XAER_NOTA} error.</p>
+ * </td>
+ * <td>
+ * Distinguish between transactions created during this run and restored from 
state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} 
for the latter.
+ * </td>
+ * </tr>
+ * </tbody>
+ * </table>
+ *
+ * @since 1.11
+ */
+@Internal
+public class JdbcXaSinkFunction<T> extends AbstractRichFunction
+        implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, 
AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcXaSinkFunction.class);
+
+    private final XaFacade xaFacade;
+    private final XaGroupOps xaGroupOps;
+    private final XidGenerator xidGenerator;
+    private final JdbcBatchingOutputFormat<T, T, 
JdbcBatchStatementExecutor<T>> format;
+    private final XaSinkStateHandler stateHandler;
+    private final JdbcExactlyOnceOptions options;
+
+    // checkpoints and the corresponding transactions waiting for completion 
notification from JM
+    private transient List<CheckpointAndXid> preparedXids = new ArrayList<>();
+    // hanging XIDs - used for cleanup
+    // it's a list to support retries and scaling down
+    // possible transaction states: active, idle, prepared
+    // last element is the current xid
+    private transient Deque<Xid> hangingXids = new LinkedList<>();
+    private transient Xid currentXid;
+
+    /**
+     * Creates a {@link JdbcXaSinkFunction}.
+     *
+     * <p>All parameters must be {@link java.io.Serializable serializable}.
+     *
+     * @param xaFacade {@link XaFacade} to manage XA transactions
+     */
+    public JdbcXaSinkFunction(
+            String sql,
+            JdbcStatementBuilder<T> statementBuilder,
+            XaFacade xaFacade,
+            JdbcExecutionOptions executionOptions,
+            JdbcExactlyOnceOptions options) {
+        this(
+                new JdbcBatchingOutputFormat<>(
+                        xaFacade,
+                        executionOptions,
+                        context -> {
+                            Preconditions.checkState(
+                                    
!context.getExecutionConfig().isObjectReuseEnabled(),
+                                    "objects can not be reused with JDBC sink 
function");
+                            return JdbcBatchStatementExecutor.simple(
+                                    sql, statementBuilder, 
Function.identity());
+                        },
+                        JdbcBatchingOutputFormat.RecordExtractor.identity()),
+                xaFacade,
+                XidGenerator.semanticXidGenerator(),
+                new XaSinkStateHandlerImpl(),
+                options,
+                new XaGroupOpsImpl(xaFacade));
+    }
+
+    /**
+     * Creates a {@link JdbcXaSinkFunction}.
+     *
+     * <p>All parameters must be {@link java.io.Serializable serializable}.
+     *
+     * @param format {@link JdbcBatchingOutputFormat} to write records with
+     * @param xaFacade {@link XaFacade} to manage XA transactions
+     * @param xidGenerator {@link XidGenerator} to generate new transaction ids
+     */
+    public JdbcXaSinkFunction(
+            JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> 
format,
+            XaFacade xaFacade,
+            XidGenerator xidGenerator,
+            XaSinkStateHandler stateHandler,
+            JdbcExactlyOnceOptions options,
+            XaGroupOps xaGroupOps) {
+        this.xaFacade = Preconditions.checkNotNull(xaFacade);
+        this.xidGenerator = Preconditions.checkNotNull(xidGenerator);
+        this.format = Preconditions.checkNotNull(format);
+        this.stateHandler = Preconditions.checkNotNull(stateHandler);
+        this.options = Preconditions.checkNotNull(options);
+        this.xaGroupOps = xaGroupOps;
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        JdbcXaSinkFunctionState state = stateHandler.load(context);
+        hangingXids = new LinkedList<>(state.getHanging());
+        preparedXids = new ArrayList<>(state.getPrepared());
+        LOG.info(
+                "initialized state: prepared xids: {}, hanging xids: {}",
+                preparedXids.size(),
+                hangingXids.size());
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        super.open(configuration);
+        xidGenerator.open();
+        xaFacade.open();
+        format.setRuntimeContext(getRuntimeContext());
+        format.open(
+                getRuntimeContext().getIndexOfThisSubtask(),
+                getRuntimeContext().getNumberOfParallelSubtasks());
+        hangingXids = new 
LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry());
+        commitUpToCheckpoint(Optional.empty());

Review comment:
       I have some questions:
   1. Will this leave some retry Xids in the `preparedXids`? 
   2. If the job is failed, will the job restart the checkpoint from 
checkpoint: 0?
   3. If yes, will the retry Xids in the `preparedXids` not be comitted for a 
long time?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of;
+
+/**
+ * JDBC sink function that uses XA transactions to provide exactly once 
guarantees. That is, if a
+ * checkpoint succeeds then all records emitted during it are committed in the 
database, and rolled
+ * back otherwise.
+ *
+ * <p>Each parallel subtask has it's own transactions, independent from other 
subtasks. Therefore,
+ * consistency is only guaranteed within partitions.
+ *
+ * <p>XA uses a two-phase commit protocol, which solves the consistency 
problem, but leaves the
+ * following issues:
+ *
+ * <ol>
+ *   <li>transactions may be abandoned, holding resources (e.g. locks, 
versions of rows)
+ *   <li>abandoned transactions collide with the new transactions if their IDs 
repeat after recovery
+ *   <li>commit requests may be repeated after job recovery, resulting in 
error responses and job
+ *       failure
+ * </ol>
+ *
+ * The following table summarizes effects of failures during transaction state 
transitions and ways
+ * to mitigate them:
+ *
+ * <table border="1" style="width:100%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:30%;">
+ * <col span="1" style="width:40%;">
+ * <thead>
+ * <tr>
+ * <th>Transition</th>
+ * <th>Methods</th>
+ * <th>What happens if transition lost</th>
+ * <th>Ways to mitigate</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td>none &gt; started, started &gt; ended</td>
+ * <td>open(), snapshotState()</td>
+ * <td>Database eventually discards these transactions</td>
+ * <td><ol>
+ * <li>Use globally unique XIDs</li>
+ * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see 
{@link SemanticXidGenerator}).</li>
+ * </ol></td>
+ * </tr>
+ * <tr>
+ * <td>ended &gt; prepared</td>
+ * <td>snapshotState()</td>
+ * <td>Database keeps these transactions prepared forever ("in-doubt" 
state)</td>
+ * <td>
+ * <ol>
+ * <li>store ended transactions in state; rollback on job recovery (still 
doesn't cover all scenarios)</li>
+ * <li>call xa_recover() and xa_rollback() on job recovery; disabled by 
default in order not to affect transactions of other subtasks and apps</li>
+ * <li>setting transaction timeouts (not supported by most databases)</li>
+ * <li>manual recovery and rollback</li>
+ * </ol>
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>prepared &gt; committed</td>
+ * <td>open(), notifyCheckpointComplete()</td>
+ * <td>
+ * Upon job recovery state contains committed transactions; or JM may 
notifyCheckpointComplete again after recovery.
+ * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA 
XAER_NOTA} error.</p>
+ * </td>
+ * <td>
+ * Distinguish between transactions created during this run and restored from 
state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} 
for the latter.
+ * </td>
+ * </tr>
+ * </tbody>
+ * </table>
+ *
+ * @since 1.11
+ */
+@Internal
+public class JdbcXaSinkFunction<T> extends AbstractRichFunction
+        implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, 
AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcXaSinkFunction.class);
+
+    private final XaFacade xaFacade;
+    private final XaGroupOps xaGroupOps;
+    private final XidGenerator xidGenerator;
+    private final JdbcBatchingOutputFormat<T, T, 
JdbcBatchStatementExecutor<T>> format;
+    private final XaSinkStateHandler stateHandler;
+    private final JdbcExactlyOnceOptions options;
+
+    // checkpoints and the corresponding transactions waiting for completion 
notification from JM
+    private transient List<CheckpointAndXid> preparedXids = new ArrayList<>();
+    // hanging XIDs - used for cleanup
+    // it's a list to support retries and scaling down
+    // possible transaction states: active, idle, prepared
+    // last element is the current xid
+    private transient Deque<Xid> hangingXids = new LinkedList<>();
+    private transient Xid currentXid;
+
+    /**
+     * Creates a {@link JdbcXaSinkFunction}.
+     *
+     * <p>All parameters must be {@link java.io.Serializable serializable}.
+     *
+     * @param xaFacade {@link XaFacade} to manage XA transactions
+     */
+    public JdbcXaSinkFunction(
+            String sql,
+            JdbcStatementBuilder<T> statementBuilder,
+            XaFacade xaFacade,
+            JdbcExecutionOptions executionOptions,
+            JdbcExactlyOnceOptions options) {
+        this(
+                new JdbcBatchingOutputFormat<>(
+                        xaFacade,
+                        executionOptions,
+                        context -> {
+                            Preconditions.checkState(
+                                    
!context.getExecutionConfig().isObjectReuseEnabled(),
+                                    "objects can not be reused with JDBC sink 
function");
+                            return JdbcBatchStatementExecutor.simple(
+                                    sql, statementBuilder, 
Function.identity());
+                        },
+                        JdbcBatchingOutputFormat.RecordExtractor.identity()),
+                xaFacade,
+                XidGenerator.semanticXidGenerator(),
+                new XaSinkStateHandlerImpl(),
+                options,
+                new XaGroupOpsImpl(xaFacade));
+    }
+
+    /**
+     * Creates a {@link JdbcXaSinkFunction}.
+     *
+     * <p>All parameters must be {@link java.io.Serializable serializable}.
+     *
+     * @param format {@link JdbcBatchingOutputFormat} to write records with
+     * @param xaFacade {@link XaFacade} to manage XA transactions
+     * @param xidGenerator {@link XidGenerator} to generate new transaction ids
+     */
+    public JdbcXaSinkFunction(
+            JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> 
format,
+            XaFacade xaFacade,
+            XidGenerator xidGenerator,
+            XaSinkStateHandler stateHandler,
+            JdbcExactlyOnceOptions options,
+            XaGroupOps xaGroupOps) {
+        this.xaFacade = Preconditions.checkNotNull(xaFacade);
+        this.xidGenerator = Preconditions.checkNotNull(xidGenerator);
+        this.format = Preconditions.checkNotNull(format);
+        this.stateHandler = Preconditions.checkNotNull(stateHandler);
+        this.options = Preconditions.checkNotNull(options);
+        this.xaGroupOps = xaGroupOps;
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        JdbcXaSinkFunctionState state = stateHandler.load(context);
+        hangingXids = new LinkedList<>(state.getHanging());
+        preparedXids = new ArrayList<>(state.getPrepared());
+        LOG.info(
+                "initialized state: prepared xids: {}, hanging xids: {}",
+                preparedXids.size(),
+                hangingXids.size());
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        super.open(configuration);
+        xidGenerator.open();
+        xaFacade.open();
+        format.setRuntimeContext(getRuntimeContext());
+        format.open(
+                getRuntimeContext().getIndexOfThisSubtask(),
+                getRuntimeContext().getNumberOfParallelSubtasks());
+        hangingXids = new 
LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry());
+        commitUpToCheckpoint(Optional.empty());
+        if (options.isDiscoverAndRollbackOnRecovery()) {
+            // todo: consider doing recover-rollback later (e.g. after the 1st 
checkpoint)
+            // when we are sure that all other subtasks started and committed 
any of their prepared
+            // transactions
+            // this would require to distinguish between this job Xids and 
other Xids
+            xaGroupOps.recoverAndRollback();
+        }
+        beginTx(0L);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        LOG.debug("snapshot state, checkpointId={}", 
context.getCheckpointId());
+        rollbackPreparedFromCheckpoint(context.getCheckpointId());
+        prepareCurrentTx(context.getCheckpointId());
+        beginTx(context.getCheckpointId() + 1);
+        stateHandler.store(of(preparedXids, hangingXids));
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        commitUpToCheckpoint(Optional.of(checkpointId));
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws IOException {
+        Preconditions.checkState(currentXid != null, "current xid must not be 
null");
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("invoke, xid: {}, value: {}", currentXid, value);
+        }
+        format.writeRecord(value);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (currentXid != null && xaFacade.isOpen()) {
+            try {
+                LOG.debug("remove current transaction before closing, xid={}", 
currentXid);
+                xaFacade.failOrRollback(currentXid);
+            } catch (Exception e) {
+                LOG.warn("unable to fail/rollback current transaction, 
xid={}", currentXid, e);
+            }
+        }
+        xaFacade.close();
+        xidGenerator.close();
+        // don't format.close(); as we don't want neither to flush nor to 
close connection here
+        currentXid = null;
+        hangingXids = null;
+        preparedXids = null;
+    }
+
+    private void prepareCurrentTx(long checkpointId) throws IOException {
+        Preconditions.checkState(currentXid != null, "no current xid");
+        Preconditions.checkState(
+                !hangingXids.isEmpty() && 
hangingXids.peek().equals(currentXid),
+                "inconsistent internal state");
+        hangingXids.poll();
+        format.flush();
+        try {
+            xaFacade.endAndPrepare(currentXid);
+            preparedXids.add(CheckpointAndXid.createNew(checkpointId, 
currentXid));
+        } catch (EmptyXaTransactionException e) {
+            LOG.info(
+                    "empty XA transaction (skip), xid: {}, checkpoint {}",
+                    currentXid,
+                    checkpointId);
+        }
+        currentXid = null;
+    }
+
+    /** @param checkpointId to associate with the new transaction. */
+    private void beginTx(long checkpointId) {
+        Preconditions.checkState(currentXid == null, "currentXid not null");
+        currentXid = xidGenerator.generateXid(getRuntimeContext(), 
checkpointId);
+        hangingXids.offer(currentXid);
+        xaFacade.start(currentXid);
+    }
+
+    private void commitUpToCheckpoint(Optional<Long> checkpointInclusive) {
+        Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> splittedXids =
+                split(preparedXids, checkpointInclusive, true);
+        if (splittedXids.f0.isEmpty()) {
+            checkpointInclusive.ifPresent(
+                    cp -> LOG.warn("nothing to commit up to checkpoint: {}", 
cp));
+        } else {
+            preparedXids = splittedXids.f1;
+            preparedXids.addAll(
+                    xaGroupOps
+                            .commit(
+                                    splittedXids.f0,
+                                    options.isAllowOutOfOrderCommits(),
+                                    options.getMaxCommitAttempts())
+                            .getForRetry());
+        }
+    }
+
+    private void rollbackPreparedFromCheckpoint(long fromCheckpointInclusive) {
+        Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> splittedXids =
+                split(preparedXids, fromCheckpointInclusive, false);
+        if (splittedXids.f1.isEmpty()) {
+            return;
+        }
+        preparedXids = splittedXids.f0;
+        LOG.warn(
+                "state snapshots have already been taken for checkpoint >= {}, 
rolling back {} transactions",
+                fromCheckpointInclusive,
+                splittedXids.f1.size());
+        xaGroupOps
+                .failOrRollback(
+                        splittedXids.f1.stream()
+                                .map(CheckpointAndXid::getXid)
+                                .collect(Collectors.toList()))
+                .getForRetry()
+                .forEach(hangingXids::offerFirst);

Review comment:
       Will this break the state consistent that `hangingXids.peek()` should 
equal to `currentXid`?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidSerializer.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+
+/** {@link Xid} serializer. */
+@Internal
+final class XidSerializer extends TypeSerializer<Xid> {
+
+    private static final TypeSerializerSnapshot<Xid> SNAPSHOT =
+            new SimpleTypeSerializerSnapshot<Xid>(XidSerializer::new) {
+                private static final int VERSION = 1;
+
+                @Override
+                public void writeSnapshot(DataOutputView out) throws 
IOException {
+                    super.writeSnapshot(out);
+                    out.writeInt(VERSION);
+                }
+
+                @Override
+                public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader classLoader)
+                        throws IOException {
+                    super.readSnapshot(readVersion, in, classLoader);
+                    in.readInt();
+                }
+            };
+
+    @Override
+    public boolean isImmutableType() {
+        return true;
+    }
+
+    @Override
+    public TypeSerializer<Xid> duplicate() {
+        return this;
+    }
+
+    @Override
+    public Xid createInstance() {
+        return new XidImpl(0, new byte[0], new byte[0]);
+    }
+
+    @Override
+    public Xid copy(Xid from) {
+        return from;
+    }
+
+    @Override
+    public Xid copy(Xid from, Xid reuse) {
+        return from;
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public void serialize(Xid xid, DataOutputView target) throws IOException {
+        target.writeInt(xid.getFormatId());
+        writeBytesWithSize(target, xid.getGlobalTransactionId());
+        writeBytesWithSize(target, xid.getBranchQualifier());
+    }
+
+    @Override
+    public Xid deserialize(DataInputView source) throws IOException {
+        return new XidImpl(source.readInt(), readBytesWithSize(source), 
readBytesWithSize(source));
+    }
+
+    private void writeBytesWithSize(DataOutputView target, byte[] bytes) 
throws IOException {
+        target.writeByte(bytes.length);
+        target.write(bytes, 0, bytes.length);
+    }
+
+    private byte[] readBytesWithSize(DataInputView source) throws IOException {
+        byte len = source.readByte();
+        byte[] bytes = new byte[len];
+        source.read(bytes, 0, len);
+        return bytes;
+    }
+
+    @Override
+    public Xid deserialize(Xid reuse, DataInputView source) throws IOException 
{
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        serialize(deserialize(source), target);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj == this || obj instanceof XidSerializer;

Review comment:
       Condition `obj == this` covered by subsequent condition `obj instanceof 
XidSerializer`, we can remove `obj == this` here.

##########
File path: flink-connectors/flink-connector-jdbc/pom.xml
##########
@@ -55,7 +55,15 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
-                       <scope>provided</scope>
+                       <type>test-jar</type>

Review comment:
       Could we add a `test` scope to this? And move this one and the following 
one under the `<!-- test dependencies -->`. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.sql.XAConnection;
+import javax.sql.XADataSource;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+import static javax.transaction.xa.XAException.XAER_NOTA;
+import static javax.transaction.xa.XAException.XAER_RMFAIL;
+import static javax.transaction.xa.XAException.XA_HEURCOM;
+import static javax.transaction.xa.XAException.XA_HEURHAZ;
+import static javax.transaction.xa.XAException.XA_HEURMIX;
+import static javax.transaction.xa.XAException.XA_HEURRB;
+import static javax.transaction.xa.XAException.XA_RBBASE;
+import static javax.transaction.xa.XAException.XA_RBTIMEOUT;
+import static javax.transaction.xa.XAException.XA_RBTRANSIENT;
+import static javax.transaction.xa.XAResource.TMENDRSCAN;
+import static javax.transaction.xa.XAResource.TMNOFLAGS;
+import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
+
+/** Default {@link XaFacade} implementation. */
+@NotThreadSafe
+@Internal
+class XaFacadeImpl implements XaFacade {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(XaFacadeImpl.class);
+    private static final Set<Integer> TRANSIENT_ERR_CODES =
+            new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL));
+    private static final Set<Integer> HEUR_ERR_CODES =
+            new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, 
XA_HEURMIX));
+    private static final int MAX_RECOVER_CALLS = 100;
+
+    private final Supplier<XADataSource> dataSourceSupplier;
+    private final Integer timeoutSec;
+    private transient XAResource xaResource;
+    private transient Connection connection;
+    private transient XAConnection xaConnection;
+
+    /** @return a non-serializable instance. */
+    static XaFacadeImpl fromXaDataSource(XADataSource ds) {
+        return new XaFacadeImpl(() -> ds, empty());
+    }
+
+    XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Optional<Integer> 
timeoutSec) {
+        this.dataSourceSupplier = 
Preconditions.checkNotNull(dataSourceSupplier);
+        this.timeoutSec = timeoutSec.orElse(null);
+    }
+
+    @Override
+    public void open() throws SQLException, XAException {
+        Preconditions.checkState(!isOpen(), "already connected");
+        XADataSource ds = dataSourceSupplier.get();
+        xaConnection = ds.getXAConnection();
+        xaResource = xaConnection.getXAResource();
+        if (timeoutSec != null) {
+            xaResource.setTransactionTimeout(timeoutSec);
+        }
+        connection = xaConnection.getConnection();
+        connection.setReadOnly(false);
+        connection.setAutoCommit(false);
+        Preconditions.checkState(!connection.getAutoCommit());
+    }
+
+    @Override
+    public void close() throws SQLException {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        if (xaConnection != null) {
+            xaConnection.close();
+            xaConnection = null;
+        }
+        xaResource = null;
+    }
+
+    @Override
+    public Connection getConnection() {
+        Preconditions.checkNotNull(connection);
+        return connection;
+    }
+
+    @Override
+    public boolean isConnectionValid() {
+        return isOpen();

Review comment:
       Should use `connection.isValid` to check the validation?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidSerializer.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+
+/** {@link Xid} serializer. */
+@Internal
+final class XidSerializer extends TypeSerializer<Xid> {
+
+    private static final TypeSerializerSnapshot<Xid> SNAPSHOT =
+            new SimpleTypeSerializerSnapshot<Xid>(XidSerializer::new) {
+                private static final int VERSION = 1;
+
+                @Override
+                public void writeSnapshot(DataOutputView out) throws 
IOException {
+                    super.writeSnapshot(out);
+                    out.writeInt(VERSION);
+                }
+
+                @Override
+                public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader classLoader)
+                        throws IOException {
+                    super.readSnapshot(readVersion, in, classLoader);
+                    in.readInt();
+                }
+            };
+
+    @Override
+    public boolean isImmutableType() {
+        return true;
+    }
+
+    @Override
+    public TypeSerializer<Xid> duplicate() {
+        return this;
+    }
+
+    @Override
+    public Xid createInstance() {
+        return new XidImpl(0, new byte[0], new byte[0]);
+    }
+
+    @Override
+    public Xid copy(Xid from) {
+        return from;
+    }
+
+    @Override
+    public Xid copy(Xid from, Xid reuse) {
+        return from;
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public void serialize(Xid xid, DataOutputView target) throws IOException {
+        target.writeInt(xid.getFormatId());
+        writeBytesWithSize(target, xid.getGlobalTransactionId());
+        writeBytesWithSize(target, xid.getBranchQualifier());
+    }
+
+    @Override
+    public Xid deserialize(DataInputView source) throws IOException {
+        return new XidImpl(source.readInt(), readBytesWithSize(source), 
readBytesWithSize(source));
+    }
+
+    private void writeBytesWithSize(DataOutputView target, byte[] bytes) 
throws IOException {
+        target.writeByte(bytes.length);
+        target.write(bytes, 0, bytes.length);
+    }
+
+    private byte[] readBytesWithSize(DataInputView source) throws IOException {
+        byte len = source.readByte();
+        byte[] bytes = new byte[len];
+        source.read(bytes, 0, len);
+        return bytes;
+    }
+
+    @Override
+    public Xid deserialize(Xid reuse, DataInputView source) throws IOException 
{
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        serialize(deserialize(source), target);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj == this || obj instanceof XidSerializer;
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;

Review comment:
       Better to return `this.getClass().hashCode();`.

##########
File path: flink-connectors/flink-connector-jdbc/pom.xml
##########
@@ -92,6 +100,19 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>javax.transaction</groupId>
+                       <artifactId>javax.transaction-api</artifactId>
+                       <version>1.3</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.h2database</groupId>
+                       <artifactId>h2</artifactId>
+                       <version>1.4.200</version>
+                       <scope>compile</scope>

Review comment:
       I think this is only used for testing, could we change the scope to 
`test`?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/CheckpointAndXidSerializer.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** {@link CheckpointAndXid} serializer. */
+@Internal
+final class CheckpointAndXidSerializer extends 
TypeSerializer<CheckpointAndXid> {
+
+    private static final TypeSerializerSnapshot<CheckpointAndXid> SNAPSHOT =
+            new 
SimpleTypeSerializerSnapshot<CheckpointAndXid>(CheckpointAndXidSerializer::new) 
{
+                private static final int VERSION = 1;
+
+                @Override
+                public void writeSnapshot(DataOutputView out) throws 
IOException {
+                    super.writeSnapshot(out);
+                    out.writeInt(VERSION);
+                }
+
+                @Override
+                public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader classLoader)
+                        throws IOException {
+                    super.readSnapshot(readVersion, in, classLoader);
+                    in.readInt();
+                }
+            };
+
+    private final TypeSerializer<Xid> xidSerializer = new XidSerializer();
+
+    @Override
+    public boolean isImmutableType() {
+        return xidSerializer.isImmutableType();
+    }
+
+    @Override
+    public TypeSerializer<CheckpointAndXid> duplicate() {
+        return this;
+    }
+
+    @Override
+    public CheckpointAndXid createInstance() {
+        return CheckpointAndXid.createRestored(0L, 0, 
xidSerializer.createInstance());
+    }
+
+    @Override
+    public CheckpointAndXid copy(CheckpointAndXid from) {
+        return CheckpointAndXid.createRestored(
+                from.checkpointId, from.attempts, 
xidSerializer.copy(from.xid));

Review comment:
       This loses the `restored` information?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * JDBC exactly once sink options.
+ *
+ * <p><b>maxCommitAttempts</b> - maximum number of commit attempts to make per 
transaction; must be
+ * > 0; state size is proportional to the product of max number of in-flight 
snapshots and this
+ * number.
+ *
+ * <p><b>allowOutOfOrderCommits</b> - If true, all prepared transactions will 
be attempted to commit
+ * regardless of any transient failures during this operation. This may lead 
to inconsistency.
+ * Default: false.
+ *
+ * <p><b>recoveredAndRollback</b> - whether to rollback prepared transactions 
known to XA RM on
+ * startup (after committing <b>known</b> transactions, i.e. restored from 
state).
+ *
+ * <p>NOTE that setting this parameter to true may:
+ *
+ * <ol>
+ *   <li>interfere with other subtasks or applications (one subtask rolling 
back transactions
+ *       prepared by the other one (and known to it))
+ *   <li>block when using with some non-MVCC databases, if there are 
ended-not-prepared transactions
+ * </ol>
+ *
+ * See also {@link org.apache.flink.connector.jdbc.xa.XaFacade#recover()}
+ */
+public class JdbcExactlyOnceOptions implements Serializable {

Review comment:
       Add `@PublicEvolving` annotation?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.sql.XAConnection;
+import javax.sql.XADataSource;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+import static javax.transaction.xa.XAException.XAER_NOTA;
+import static javax.transaction.xa.XAException.XAER_RMFAIL;
+import static javax.transaction.xa.XAException.XA_HEURCOM;
+import static javax.transaction.xa.XAException.XA_HEURHAZ;
+import static javax.transaction.xa.XAException.XA_HEURMIX;
+import static javax.transaction.xa.XAException.XA_HEURRB;
+import static javax.transaction.xa.XAException.XA_RBBASE;
+import static javax.transaction.xa.XAException.XA_RBTIMEOUT;
+import static javax.transaction.xa.XAException.XA_RBTRANSIENT;
+import static javax.transaction.xa.XAResource.TMENDRSCAN;
+import static javax.transaction.xa.XAResource.TMNOFLAGS;
+import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
+
+/** Default {@link XaFacade} implementation. */
+@NotThreadSafe
+@Internal
+class XaFacadeImpl implements XaFacade {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(XaFacadeImpl.class);
+    private static final Set<Integer> TRANSIENT_ERR_CODES =
+            new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL));
+    private static final Set<Integer> HEUR_ERR_CODES =
+            new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, 
XA_HEURMIX));
+    private static final int MAX_RECOVER_CALLS = 100;
+
+    private final Supplier<XADataSource> dataSourceSupplier;
+    private final Integer timeoutSec;
+    private transient XAResource xaResource;
+    private transient Connection connection;
+    private transient XAConnection xaConnection;
+
+    /** @return a non-serializable instance. */
+    static XaFacadeImpl fromXaDataSource(XADataSource ds) {
+        return new XaFacadeImpl(() -> ds, empty());
+    }
+
+    XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Optional<Integer> 
timeoutSec) {
+        this.dataSourceSupplier = 
Preconditions.checkNotNull(dataSourceSupplier);
+        this.timeoutSec = timeoutSec.orElse(null);
+    }
+
+    @Override
+    public void open() throws SQLException, XAException {
+        Preconditions.checkState(!isOpen(), "already connected");
+        XADataSource ds = dataSourceSupplier.get();
+        xaConnection = ds.getXAConnection();
+        xaResource = xaConnection.getXAResource();
+        if (timeoutSec != null) {
+            xaResource.setTransactionTimeout(timeoutSec);
+        }
+        connection = xaConnection.getConnection();
+        connection.setReadOnly(false);
+        connection.setAutoCommit(false);
+        Preconditions.checkState(!connection.getAutoCommit());
+    }
+
+    @Override
+    public void close() throws SQLException {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        if (xaConnection != null) {
+            xaConnection.close();
+            xaConnection = null;
+        }
+        xaResource = null;
+    }
+
+    @Override
+    public Connection getConnection() {
+        Preconditions.checkNotNull(connection);
+        return connection;
+    }
+
+    @Override
+    public boolean isConnectionValid() {
+        return isOpen();
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() throws SQLException, 
ClassNotFoundException {
+        if (!isOpen()) {
+            try {
+                open();
+            } catch (XAException e) {
+                throw new SQLException(e);
+            }
+        }
+        return connection;
+    }
+
+    @Override
+    public void closeConnection() {
+        try {
+            close();
+        } catch (SQLException e) {
+            LOG.warn("Connection close failed.", e);
+        }
+    }
+
+    @Override
+    public Connection reestablishConnection() {
+        throw new UnsupportedOperationException();

Review comment:
       I think this can be simply supported by calling `close()` and `open()` ?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of;
+
+/**
+ * JDBC sink function that uses XA transactions to provide exactly once 
guarantees. That is, if a
+ * checkpoint succeeds then all records emitted during it are committed in the 
database, and rolled
+ * back otherwise.
+ *
+ * <p>Each parallel subtask has it's own transactions, independent from other 
subtasks. Therefore,
+ * consistency is only guaranteed within partitions.
+ *
+ * <p>XA uses a two-phase commit protocol, which solves the consistency 
problem, but leaves the
+ * following issues:
+ *
+ * <ol>
+ *   <li>transactions may be abandoned, holding resources (e.g. locks, 
versions of rows)
+ *   <li>abandoned transactions collide with the new transactions if their IDs 
repeat after recovery
+ *   <li>commit requests may be repeated after job recovery, resulting in 
error responses and job
+ *       failure
+ * </ol>
+ *
+ * The following table summarizes effects of failures during transaction state 
transitions and ways
+ * to mitigate them:
+ *
+ * <table border="1" style="width:100%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:30%;">
+ * <col span="1" style="width:40%;">
+ * <thead>
+ * <tr>
+ * <th>Transition</th>
+ * <th>Methods</th>
+ * <th>What happens if transition lost</th>
+ * <th>Ways to mitigate</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td>none &gt; started, started &gt; ended</td>
+ * <td>open(), snapshotState()</td>
+ * <td>Database eventually discards these transactions</td>
+ * <td><ol>
+ * <li>Use globally unique XIDs</li>
+ * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see 
{@link SemanticXidGenerator}).</li>
+ * </ol></td>
+ * </tr>
+ * <tr>
+ * <td>ended &gt; prepared</td>
+ * <td>snapshotState()</td>
+ * <td>Database keeps these transactions prepared forever ("in-doubt" 
state)</td>
+ * <td>
+ * <ol>
+ * <li>store ended transactions in state; rollback on job recovery (still 
doesn't cover all scenarios)</li>
+ * <li>call xa_recover() and xa_rollback() on job recovery; disabled by 
default in order not to affect transactions of other subtasks and apps</li>
+ * <li>setting transaction timeouts (not supported by most databases)</li>
+ * <li>manual recovery and rollback</li>
+ * </ol>
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>prepared &gt; committed</td>
+ * <td>open(), notifyCheckpointComplete()</td>
+ * <td>
+ * Upon job recovery state contains committed transactions; or JM may 
notifyCheckpointComplete again after recovery.
+ * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA 
XAER_NOTA} error.</p>
+ * </td>
+ * <td>
+ * Distinguish between transactions created during this run and restored from 
state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} 
for the latter.
+ * </td>
+ * </tr>
+ * </tbody>
+ * </table>
+ *
+ * @since 1.11
+ */
+@Internal
+public class JdbcXaSinkFunction<T> extends AbstractRichFunction
+        implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, 
AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcXaSinkFunction.class);
+
+    private final XaFacade xaFacade;
+    private final XaGroupOps xaGroupOps;
+    private final XidGenerator xidGenerator;
+    private final JdbcBatchingOutputFormat<T, T, 
JdbcBatchStatementExecutor<T>> format;
+    private final XaSinkStateHandler stateHandler;
+    private final JdbcExactlyOnceOptions options;
+
+    // checkpoints and the corresponding transactions waiting for completion 
notification from JM
+    private transient List<CheckpointAndXid> preparedXids = new ArrayList<>();
+    // hanging XIDs - used for cleanup
+    // it's a list to support retries and scaling down
+    // possible transaction states: active, idle, prepared
+    // last element is the current xid
+    private transient Deque<Xid> hangingXids = new LinkedList<>();
+    private transient Xid currentXid;
+
+    /**
+     * Creates a {@link JdbcXaSinkFunction}.
+     *
+     * <p>All parameters must be {@link java.io.Serializable serializable}.
+     *
+     * @param xaFacade {@link XaFacade} to manage XA transactions
+     */
+    public JdbcXaSinkFunction(
+            String sql,
+            JdbcStatementBuilder<T> statementBuilder,
+            XaFacade xaFacade,
+            JdbcExecutionOptions executionOptions,
+            JdbcExactlyOnceOptions options) {
+        this(
+                new JdbcBatchingOutputFormat<>(
+                        xaFacade,
+                        executionOptions,
+                        context -> {
+                            Preconditions.checkState(
+                                    
!context.getExecutionConfig().isObjectReuseEnabled(),
+                                    "objects can not be reused with JDBC sink 
function");
+                            return JdbcBatchStatementExecutor.simple(
+                                    sql, statementBuilder, 
Function.identity());
+                        },
+                        JdbcBatchingOutputFormat.RecordExtractor.identity()),
+                xaFacade,
+                XidGenerator.semanticXidGenerator(),
+                new XaSinkStateHandlerImpl(),
+                options,
+                new XaGroupOpsImpl(xaFacade));
+    }
+
+    /**
+     * Creates a {@link JdbcXaSinkFunction}.
+     *
+     * <p>All parameters must be {@link java.io.Serializable serializable}.
+     *
+     * @param format {@link JdbcBatchingOutputFormat} to write records with
+     * @param xaFacade {@link XaFacade} to manage XA transactions
+     * @param xidGenerator {@link XidGenerator} to generate new transaction ids
+     */
+    public JdbcXaSinkFunction(
+            JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> 
format,
+            XaFacade xaFacade,
+            XidGenerator xidGenerator,
+            XaSinkStateHandler stateHandler,
+            JdbcExactlyOnceOptions options,
+            XaGroupOps xaGroupOps) {
+        this.xaFacade = Preconditions.checkNotNull(xaFacade);
+        this.xidGenerator = Preconditions.checkNotNull(xidGenerator);
+        this.format = Preconditions.checkNotNull(format);
+        this.stateHandler = Preconditions.checkNotNull(stateHandler);
+        this.options = Preconditions.checkNotNull(options);
+        this.xaGroupOps = xaGroupOps;
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        JdbcXaSinkFunctionState state = stateHandler.load(context);
+        hangingXids = new LinkedList<>(state.getHanging());

Review comment:
       Does the `hangingXids` have to be a `Deque` to keep the order? If yes, 
I'm wondering the state recovery doesn't retain the order. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of;
+
+/**
+ * JDBC sink function that uses XA transactions to provide exactly once 
guarantees. That is, if a
+ * checkpoint succeeds then all records emitted during it are committed in the 
database, and rolled
+ * back otherwise.
+ *
+ * <p>Each parallel subtask has it's own transactions, independent from other 
subtasks. Therefore,
+ * consistency is only guaranteed within partitions.
+ *
+ * <p>XA uses a two-phase commit protocol, which solves the consistency 
problem, but leaves the
+ * following issues:
+ *
+ * <ol>
+ *   <li>transactions may be abandoned, holding resources (e.g. locks, 
versions of rows)
+ *   <li>abandoned transactions collide with the new transactions if their IDs 
repeat after recovery
+ *   <li>commit requests may be repeated after job recovery, resulting in 
error responses and job
+ *       failure
+ * </ol>
+ *
+ * The following table summarizes effects of failures during transaction state 
transitions and ways
+ * to mitigate them:
+ *
+ * <table border="1" style="width:100%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:15%;">
+ * <col span="1" style="width:30%;">
+ * <col span="1" style="width:40%;">
+ * <thead>
+ * <tr>
+ * <th>Transition</th>
+ * <th>Methods</th>
+ * <th>What happens if transition lost</th>
+ * <th>Ways to mitigate</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td>none &gt; started, started &gt; ended</td>
+ * <td>open(), snapshotState()</td>
+ * <td>Database eventually discards these transactions</td>
+ * <td><ol>
+ * <li>Use globally unique XIDs</li>
+ * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see 
{@link SemanticXidGenerator}).</li>
+ * </ol></td>
+ * </tr>
+ * <tr>
+ * <td>ended &gt; prepared</td>
+ * <td>snapshotState()</td>
+ * <td>Database keeps these transactions prepared forever ("in-doubt" 
state)</td>
+ * <td>
+ * <ol>
+ * <li>store ended transactions in state; rollback on job recovery (still 
doesn't cover all scenarios)</li>
+ * <li>call xa_recover() and xa_rollback() on job recovery; disabled by 
default in order not to affect transactions of other subtasks and apps</li>
+ * <li>setting transaction timeouts (not supported by most databases)</li>
+ * <li>manual recovery and rollback</li>
+ * </ol>
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>prepared &gt; committed</td>
+ * <td>open(), notifyCheckpointComplete()</td>
+ * <td>
+ * Upon job recovery state contains committed transactions; or JM may 
notifyCheckpointComplete again after recovery.
+ * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA 
XAER_NOTA} error.</p>
+ * </td>
+ * <td>
+ * Distinguish between transactions created during this run and restored from 
state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} 
for the latter.
+ * </td>
+ * </tr>
+ * </tbody>
+ * </table>
+ *
+ * @since 1.11
+ */
+@Internal
+public class JdbcXaSinkFunction<T> extends AbstractRichFunction
+        implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, 
AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcXaSinkFunction.class);
+
+    private final XaFacade xaFacade;
+    private final XaGroupOps xaGroupOps;
+    private final XidGenerator xidGenerator;
+    private final JdbcBatchingOutputFormat<T, T, 
JdbcBatchStatementExecutor<T>> format;

Review comment:
       nit: would be better to call this `outputFormat` to avoid confusing with 
formats, e.g. csv, json. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * JDBC exactly once sink options.
+ *
+ * <p><b>maxCommitAttempts</b> - maximum number of commit attempts to make per 
transaction; must be
+ * > 0; state size is proportional to the product of max number of in-flight 
snapshots and this
+ * number.
+ *
+ * <p><b>allowOutOfOrderCommits</b> - If true, all prepared transactions will 
be attempted to commit
+ * regardless of any transient failures during this operation. This may lead 
to inconsistency.
+ * Default: false.
+ *
+ * <p><b>recoveredAndRollback</b> - whether to rollback prepared transactions 
known to XA RM on
+ * startup (after committing <b>known</b> transactions, i.e. restored from 
state).
+ *
+ * <p>NOTE that setting this parameter to true may:
+ *
+ * <ol>
+ *   <li>interfere with other subtasks or applications (one subtask rolling 
back transactions
+ *       prepared by the other one (and known to it))
+ *   <li>block when using with some non-MVCC databases, if there are 
ended-not-prepared transactions
+ * </ol>
+ *
+ * See also {@link org.apache.flink.connector.jdbc.xa.XaFacade#recover()}
+ */
+public class JdbcExactlyOnceOptions implements Serializable {
+
+    private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = false;
+    private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
+    private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false;
+
+    private final boolean discoverAndRollbackOnRecovery;
+    private final int maxCommitAttempts;
+    private final boolean allowOutOfOrderCommits;
+    private final Integer timeoutSec;
+
+    private JdbcExactlyOnceOptions(
+            boolean discoverAndRollbackOnRecovery,
+            int maxCommitAttempts,
+            boolean allowOutOfOrderCommits,
+            Optional<Integer> timeoutSec) {
+        this.discoverAndRollbackOnRecovery = discoverAndRollbackOnRecovery;
+        this.maxCommitAttempts = maxCommitAttempts;
+        this.allowOutOfOrderCommits = allowOutOfOrderCommits;
+        this.timeoutSec = timeoutSec.orElse(null);
+        Preconditions.checkArgument(this.maxCommitAttempts > 0, 
"maxCommitAttempts should be > 0");
+    }
+
+    public static JdbcExactlyOnceOptions defaults() {
+        return builder().build();
+    }
+
+    public boolean isDiscoverAndRollbackOnRecovery() {
+        return discoverAndRollbackOnRecovery;
+    }
+
+    public boolean isAllowOutOfOrderCommits() {
+        return allowOutOfOrderCommits;
+    }
+
+    public int getMaxCommitAttempts() {
+        return maxCommitAttempts;
+    }
+
+    public Integer getTimeoutSec() {
+        return timeoutSec;
+    }
+
+    public static JDBCExactlyOnceOptionsBuilder builder() {
+        return new JDBCExactlyOnceOptionsBuilder();
+    }
+
+    /** JDBCExactlyOnceOptionsBuilder. */
+    public static class JDBCExactlyOnceOptionsBuilder {
+        private boolean recoveredAndRollback = DEFAULT_RECOVERED_AND_ROLLBACK;
+        private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+        private boolean allowOutOfOrderCommits = 
DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS;
+        private Optional<Integer> timeoutSec = Optional.empty();
+
+        public JDBCExactlyOnceOptionsBuilder withRecoveredAndRollback(

Review comment:
       Could you add Javadocs on the configuration mehtods about what are the 
configs used for?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.xa;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of;
+
+/**
+ * JDBC sink function that uses XA transactions to provide exactly once 
guarantees. That is, if a
+ * checkpoint succeeds then all records emitted during it are committed in the 
database, and rolled
+ * back otherwise.
+ *
+ * <p>Each parallel subtask has it's own transactions, independent from other 
subtasks. Therefore,
+ * consistency is only guaranteed within partitions.

Review comment:
       I'm a little confused about this. Does this mean we can't provide 
exactly once guarantee across partititons?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to