rkhachatryan commented on a change in pull request #10847: URL: https://github.com/apache/flink/pull/10847#discussion_r554121201
########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; + private final XaSinkStateHandler stateHandler; + private final JdbcExactlyOnceOptions options; + + // checkpoints and the corresponding transactions waiting for completion notification from JM + private transient List<CheckpointAndXid> preparedXids = new ArrayList<>(); + // hanging XIDs - used for cleanup + // it's a list to support retries and scaling down + // possible transaction states: active, idle, prepared + // last element is the current xid + private transient Deque<Xid> hangingXids = new LinkedList<>(); + private transient Xid currentXid; + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param xaFacade {@link XaFacade} to manage XA transactions + */ + public JdbcXaSinkFunction( + String sql, + JdbcStatementBuilder<T> statementBuilder, + XaFacade xaFacade, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions options) { + this( + new JdbcBatchingOutputFormat<>( + xaFacade, + executionOptions, + context -> { + Preconditions.checkState( + !context.getExecutionConfig().isObjectReuseEnabled(), + "objects can not be reused with JDBC sink function"); + return JdbcBatchStatementExecutor.simple( + sql, statementBuilder, Function.identity()); + }, + JdbcBatchingOutputFormat.RecordExtractor.identity()), + xaFacade, + XidGenerator.semanticXidGenerator(), + new XaSinkStateHandlerImpl(), + options, + new XaGroupOpsImpl(xaFacade)); + } + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param format {@link JdbcBatchingOutputFormat} to write records with + * @param xaFacade {@link XaFacade} to manage XA transactions + * @param xidGenerator {@link XidGenerator} to generate new transaction ids + */ + public JdbcXaSinkFunction( + JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format, + XaFacade xaFacade, + XidGenerator xidGenerator, + XaSinkStateHandler stateHandler, + JdbcExactlyOnceOptions options, + XaGroupOps xaGroupOps) { + this.xaFacade = Preconditions.checkNotNull(xaFacade); + this.xidGenerator = Preconditions.checkNotNull(xidGenerator); + this.format = Preconditions.checkNotNull(format); + this.stateHandler = Preconditions.checkNotNull(stateHandler); + this.options = Preconditions.checkNotNull(options); + this.xaGroupOps = xaGroupOps; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + JdbcXaSinkFunctionState state = stateHandler.load(context); + hangingXids = new LinkedList<>(state.getHanging()); Review comment: `Deque` is used to keep the current `Xid` the last; otherwise the order is not important. > the state recovery doesn't retain the order. Could you explain what do you mean? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
