rkhachatryan commented on a change in pull request #10847: URL: https://github.com/apache/flink/pull/10847#discussion_r553950894
########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/CheckpointAndXidSerializer.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.Objects; + +/** {@link CheckpointAndXid} serializer. */ +@Internal +final class CheckpointAndXidSerializer extends TypeSerializer<CheckpointAndXid> { + + private static final TypeSerializerSnapshot<CheckpointAndXid> SNAPSHOT = + new SimpleTypeSerializerSnapshot<CheckpointAndXid>(CheckpointAndXidSerializer::new) { + private static final int VERSION = 1; + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + super.writeSnapshot(out); + out.writeInt(VERSION); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) + throws IOException { + super.readSnapshot(readVersion, in, classLoader); + in.readInt(); + } + }; + + private final TypeSerializer<Xid> xidSerializer = new XidSerializer(); + + @Override + public boolean isImmutableType() { + return xidSerializer.isImmutableType(); + } + + @Override + public TypeSerializer<CheckpointAndXid> duplicate() { + return this; + } + + @Override + public CheckpointAndXid createInstance() { + return CheckpointAndXid.createRestored(0L, 0, xidSerializer.createInstance()); + } + + @Override + public CheckpointAndXid copy(CheckpointAndXid from) { + return CheckpointAndXid.createRestored( + from.checkpointId, from.attempts, xidSerializer.copy(from.xid)); Review comment: Existing `restored` value is not used because `createRestored()` always uses `true`. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/CheckpointAndXidSerializer.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.Objects; + +/** {@link CheckpointAndXid} serializer. */ +@Internal +final class CheckpointAndXidSerializer extends TypeSerializer<CheckpointAndXid> { + + private static final TypeSerializerSnapshot<CheckpointAndXid> SNAPSHOT = + new SimpleTypeSerializerSnapshot<CheckpointAndXid>(CheckpointAndXidSerializer::new) { + private static final int VERSION = 1; + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + super.writeSnapshot(out); + out.writeInt(VERSION); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) + throws IOException { + super.readSnapshot(readVersion, in, classLoader); + in.readInt(); + } + }; + + private final TypeSerializer<Xid> xidSerializer = new XidSerializer(); + + @Override + public boolean isImmutableType() { + return xidSerializer.isImmutableType(); + } + + @Override + public TypeSerializer<CheckpointAndXid> duplicate() { + return this; + } + + @Override + public CheckpointAndXid createInstance() { + return CheckpointAndXid.createRestored(0L, 0, xidSerializer.createInstance()); + } + + @Override + public CheckpointAndXid copy(CheckpointAndXid from) { + return CheckpointAndXid.createRestored( + from.checkpointId, from.attempts, xidSerializer.copy(from.xid)); + } + + @Override + public CheckpointAndXid copy(CheckpointAndXid from, CheckpointAndXid reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(CheckpointAndXid record, DataOutputView target) throws IOException { + target.writeLong(record.checkpointId); + target.writeInt(record.attempts); Review comment: Because it will forcibly set to true on restore. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import javax.sql.XAConnection; +import javax.sql.XADataSource; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static javax.transaction.xa.XAException.XAER_NOTA; +import static javax.transaction.xa.XAException.XAER_RMFAIL; +import static javax.transaction.xa.XAException.XA_HEURCOM; +import static javax.transaction.xa.XAException.XA_HEURHAZ; +import static javax.transaction.xa.XAException.XA_HEURMIX; +import static javax.transaction.xa.XAException.XA_HEURRB; +import static javax.transaction.xa.XAException.XA_RBBASE; +import static javax.transaction.xa.XAException.XA_RBTIMEOUT; +import static javax.transaction.xa.XAException.XA_RBTRANSIENT; +import static javax.transaction.xa.XAResource.TMENDRSCAN; +import static javax.transaction.xa.XAResource.TMNOFLAGS; +import static javax.transaction.xa.XAResource.TMSTARTRSCAN; + +/** Default {@link XaFacade} implementation. */ +@NotThreadSafe +@Internal +class XaFacadeImpl implements XaFacade { + + private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImpl.class); + private static final Set<Integer> TRANSIENT_ERR_CODES = + new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL)); + private static final Set<Integer> HEUR_ERR_CODES = + new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, XA_HEURMIX)); + private static final int MAX_RECOVER_CALLS = 100; + + private final Supplier<XADataSource> dataSourceSupplier; + private final Integer timeoutSec; + private transient XAResource xaResource; + private transient Connection connection; + private transient XAConnection xaConnection; + + /** @return a non-serializable instance. */ + static XaFacadeImpl fromXaDataSource(XADataSource ds) { + return new XaFacadeImpl(() -> ds, empty()); + } + + XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Optional<Integer> timeoutSec) { + this.dataSourceSupplier = Preconditions.checkNotNull(dataSourceSupplier); + this.timeoutSec = timeoutSec.orElse(null); + } + + @Override + public void open() throws SQLException, XAException { + Preconditions.checkState(!isOpen(), "already connected"); + XADataSource ds = dataSourceSupplier.get(); + xaConnection = ds.getXAConnection(); + xaResource = xaConnection.getXAResource(); + if (timeoutSec != null) { + xaResource.setTransactionTimeout(timeoutSec); + } + connection = xaConnection.getConnection(); + connection.setReadOnly(false); + connection.setAutoCommit(false); + Preconditions.checkState(!connection.getAutoCommit()); + } + + @Override + public void close() throws SQLException { + if (connection != null) { + connection.close(); + connection = null; + } + if (xaConnection != null) { + xaConnection.close(); + xaConnection = null; + } + xaResource = null; + } + + @Override + public Connection getConnection() { + Preconditions.checkNotNull(connection); + return connection; + } + + @Override + public boolean isConnectionValid() { + return isOpen(); + } + + @Override + public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException { + if (!isOpen()) { + try { + open(); + } catch (XAException e) { + throw new SQLException(e); + } + } + return connection; + } + + @Override + public void closeConnection() { + try { + close(); + } catch (SQLException e) { + LOG.warn("Connection close failed.", e); + } + } + + @Override + public Connection reestablishConnection() { + throw new UnsupportedOperationException(); Review comment: I'm afraid not: a potential ongoing XA transaction should be resumed (or at least aborted). This would require either passing `Xid` from the outside or storing current `Xid` in this class. I'd rather avoid both of these options because they either complicate the caller or duplicate `Xid` tracking. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; + private final XaSinkStateHandler stateHandler; + private final JdbcExactlyOnceOptions options; + + // checkpoints and the corresponding transactions waiting for completion notification from JM + private transient List<CheckpointAndXid> preparedXids = new ArrayList<>(); + // hanging XIDs - used for cleanup + // it's a list to support retries and scaling down + // possible transaction states: active, idle, prepared + // last element is the current xid + private transient Deque<Xid> hangingXids = new LinkedList<>(); + private transient Xid currentXid; + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param xaFacade {@link XaFacade} to manage XA transactions + */ + public JdbcXaSinkFunction( + String sql, + JdbcStatementBuilder<T> statementBuilder, + XaFacade xaFacade, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions options) { + this( + new JdbcBatchingOutputFormat<>( + xaFacade, + executionOptions, + context -> { + Preconditions.checkState( + !context.getExecutionConfig().isObjectReuseEnabled(), + "objects can not be reused with JDBC sink function"); + return JdbcBatchStatementExecutor.simple( + sql, statementBuilder, Function.identity()); + }, + JdbcBatchingOutputFormat.RecordExtractor.identity()), + xaFacade, + XidGenerator.semanticXidGenerator(), + new XaSinkStateHandlerImpl(), + options, + new XaGroupOpsImpl(xaFacade)); + } + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param format {@link JdbcBatchingOutputFormat} to write records with + * @param xaFacade {@link XaFacade} to manage XA transactions + * @param xidGenerator {@link XidGenerator} to generate new transaction ids + */ + public JdbcXaSinkFunction( + JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format, + XaFacade xaFacade, + XidGenerator xidGenerator, + XaSinkStateHandler stateHandler, + JdbcExactlyOnceOptions options, + XaGroupOps xaGroupOps) { + this.xaFacade = Preconditions.checkNotNull(xaFacade); + this.xidGenerator = Preconditions.checkNotNull(xidGenerator); + this.format = Preconditions.checkNotNull(format); + this.stateHandler = Preconditions.checkNotNull(stateHandler); + this.options = Preconditions.checkNotNull(options); + this.xaGroupOps = xaGroupOps; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + JdbcXaSinkFunctionState state = stateHandler.load(context); + hangingXids = new LinkedList<>(state.getHanging()); Review comment: `Deque` is used to keep the current `Xid` the last; otherwise the order is not important. > the state recovery doesn't retain the order. Could you explain what do you mean? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; + private final XaSinkStateHandler stateHandler; + private final JdbcExactlyOnceOptions options; + + // checkpoints and the corresponding transactions waiting for completion notification from JM + private transient List<CheckpointAndXid> preparedXids = new ArrayList<>(); + // hanging XIDs - used for cleanup + // it's a list to support retries and scaling down + // possible transaction states: active, idle, prepared + // last element is the current xid + private transient Deque<Xid> hangingXids = new LinkedList<>(); + private transient Xid currentXid; + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param xaFacade {@link XaFacade} to manage XA transactions + */ + public JdbcXaSinkFunction( + String sql, + JdbcStatementBuilder<T> statementBuilder, + XaFacade xaFacade, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions options) { + this( + new JdbcBatchingOutputFormat<>( + xaFacade, + executionOptions, + context -> { + Preconditions.checkState( + !context.getExecutionConfig().isObjectReuseEnabled(), + "objects can not be reused with JDBC sink function"); + return JdbcBatchStatementExecutor.simple( + sql, statementBuilder, Function.identity()); + }, + JdbcBatchingOutputFormat.RecordExtractor.identity()), + xaFacade, + XidGenerator.semanticXidGenerator(), + new XaSinkStateHandlerImpl(), + options, + new XaGroupOpsImpl(xaFacade)); + } + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param format {@link JdbcBatchingOutputFormat} to write records with + * @param xaFacade {@link XaFacade} to manage XA transactions + * @param xidGenerator {@link XidGenerator} to generate new transaction ids + */ + public JdbcXaSinkFunction( + JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format, + XaFacade xaFacade, + XidGenerator xidGenerator, + XaSinkStateHandler stateHandler, + JdbcExactlyOnceOptions options, + XaGroupOps xaGroupOps) { + this.xaFacade = Preconditions.checkNotNull(xaFacade); + this.xidGenerator = Preconditions.checkNotNull(xidGenerator); + this.format = Preconditions.checkNotNull(format); + this.stateHandler = Preconditions.checkNotNull(stateHandler); + this.options = Preconditions.checkNotNull(options); + this.xaGroupOps = xaGroupOps; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + JdbcXaSinkFunctionState state = stateHandler.load(context); + hangingXids = new LinkedList<>(state.getHanging()); + preparedXids = new ArrayList<>(state.getPrepared()); + LOG.info( + "initialized state: prepared xids: {}, hanging xids: {}", + preparedXids.size(), + hangingXids.size()); + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + xidGenerator.open(); + xaFacade.open(); + format.setRuntimeContext(getRuntimeContext()); + format.open( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks()); + hangingXids = new LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry()); + commitUpToCheckpoint(Optional.empty()); + if (options.isDiscoverAndRollbackOnRecovery()) { + // todo: consider doing recover-rollback later (e.g. after the 1st checkpoint) + // when we are sure that all other subtasks started and committed any of their prepared + // transactions + // this would require to distinguish between this job Xids and other Xids + xaGroupOps.recoverAndRollback(); + } + beginTx(0L); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId()); + rollbackPreparedFromCheckpoint(context.getCheckpointId()); + prepareCurrentTx(context.getCheckpointId()); + beginTx(context.getCheckpointId() + 1); + stateHandler.store(of(preparedXids, hangingXids)); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + commitUpToCheckpoint(Optional.of(checkpointId)); + } + + @Override + public void invoke(T value, Context context) throws IOException { + Preconditions.checkState(currentXid != null, "current xid must not be null"); + if (LOG.isTraceEnabled()) { + LOG.trace("invoke, xid: {}, value: {}", currentXid, value); + } + format.writeRecord(value); + } + + @Override + public void close() throws Exception { + super.close(); + if (currentXid != null && xaFacade.isOpen()) { + try { + LOG.debug("remove current transaction before closing, xid={}", currentXid); + xaFacade.failOrRollback(currentXid); + } catch (Exception e) { + LOG.warn("unable to fail/rollback current transaction, xid={}", currentXid, e); + } + } + xaFacade.close(); + xidGenerator.close(); + // don't format.close(); as we don't want neither to flush nor to close connection here + currentXid = null; + hangingXids = null; + preparedXids = null; + } + + private void prepareCurrentTx(long checkpointId) throws IOException { + Preconditions.checkState(currentXid != null, "no current xid"); + Preconditions.checkState( + !hangingXids.isEmpty() && hangingXids.peek().equals(currentXid), + "inconsistent internal state"); + hangingXids.poll(); + format.flush(); + try { + xaFacade.endAndPrepare(currentXid); + preparedXids.add(CheckpointAndXid.createNew(checkpointId, currentXid)); + } catch (EmptyXaTransactionException e) { + LOG.info( + "empty XA transaction (skip), xid: {}, checkpoint {}", + currentXid, + checkpointId); + } + currentXid = null; + } + + /** @param checkpointId to associate with the new transaction. */ + private void beginTx(long checkpointId) { + Preconditions.checkState(currentXid == null, "currentXid not null"); + currentXid = xidGenerator.generateXid(getRuntimeContext(), checkpointId); + hangingXids.offer(currentXid); + xaFacade.start(currentXid); + } + + private void commitUpToCheckpoint(Optional<Long> checkpointInclusive) { + Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> splittedXids = + split(preparedXids, checkpointInclusive, true); + if (splittedXids.f0.isEmpty()) { + checkpointInclusive.ifPresent( + cp -> LOG.warn("nothing to commit up to checkpoint: {}", cp)); + } else { + preparedXids = splittedXids.f1; + preparedXids.addAll( + xaGroupOps + .commit( + splittedXids.f0, + options.isAllowOutOfOrderCommits(), + options.getMaxCommitAttempts()) + .getForRetry()); + } + } + + private void rollbackPreparedFromCheckpoint(long fromCheckpointInclusive) { + Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> splittedXids = + split(preparedXids, fromCheckpointInclusive, false); + if (splittedXids.f1.isEmpty()) { + return; + } + preparedXids = splittedXids.f0; + LOG.warn( + "state snapshots have already been taken for checkpoint >= {}, rolling back {} transactions", + fromCheckpointInclusive, + splittedXids.f1.size()); + xaGroupOps + .failOrRollback( + splittedXids.f1.stream() + .map(CheckpointAndXid::getXid) + .collect(Collectors.toList())) + .getForRetry() + .forEach(hangingXids::offerFirst); Review comment: Yes, you're right! ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Optional; + +/** + * JDBC exactly once sink options. + * + * <p><b>maxCommitAttempts</b> - maximum number of commit attempts to make per transaction; must be + * > 0; state size is proportional to the product of max number of in-flight snapshots and this + * number. + * + * <p><b>allowOutOfOrderCommits</b> - If true, all prepared transactions will be attempted to commit + * regardless of any transient failures during this operation. This may lead to inconsistency. + * Default: false. + * + * <p><b>recoveredAndRollback</b> - whether to rollback prepared transactions known to XA RM on + * startup (after committing <b>known</b> transactions, i.e. restored from state). + * + * <p>NOTE that setting this parameter to true may: + * + * <ol> + * <li>interfere with other subtasks or applications (one subtask rolling back transactions + * prepared by the other one (and known to it)) + * <li>block when using with some non-MVCC databases, if there are ended-not-prepared transactions + * </ol> + * + * See also {@link org.apache.flink.connector.jdbc.xa.XaFacade#recover()} + */ +public class JdbcExactlyOnceOptions implements Serializable { + + private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = false; + private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3; + private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false; + + private final boolean discoverAndRollbackOnRecovery; + private final int maxCommitAttempts; + private final boolean allowOutOfOrderCommits; + private final Integer timeoutSec; + + private JdbcExactlyOnceOptions( + boolean discoverAndRollbackOnRecovery, + int maxCommitAttempts, + boolean allowOutOfOrderCommits, + Optional<Integer> timeoutSec) { + this.discoverAndRollbackOnRecovery = discoverAndRollbackOnRecovery; + this.maxCommitAttempts = maxCommitAttempts; + this.allowOutOfOrderCommits = allowOutOfOrderCommits; + this.timeoutSec = timeoutSec.orElse(null); + Preconditions.checkArgument(this.maxCommitAttempts > 0, "maxCommitAttempts should be > 0"); + } + + public static JdbcExactlyOnceOptions defaults() { + return builder().build(); + } + + public boolean isDiscoverAndRollbackOnRecovery() { + return discoverAndRollbackOnRecovery; + } + + public boolean isAllowOutOfOrderCommits() { + return allowOutOfOrderCommits; + } + + public int getMaxCommitAttempts() { + return maxCommitAttempts; + } + + public Integer getTimeoutSec() { + return timeoutSec; + } + + public static JDBCExactlyOnceOptionsBuilder builder() { + return new JDBCExactlyOnceOptionsBuilder(); + } + + /** JDBCExactlyOnceOptionsBuilder. */ + public static class JDBCExactlyOnceOptionsBuilder { + private boolean recoveredAndRollback = DEFAULT_RECOVERED_AND_ROLLBACK; + private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS; + private boolean allowOutOfOrderCommits = DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS; + private Optional<Integer> timeoutSec = Optional.empty(); + + public JDBCExactlyOnceOptionsBuilder withRecoveredAndRollback( Review comment: Sure, good point ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; + private final XaSinkStateHandler stateHandler; + private final JdbcExactlyOnceOptions options; + + // checkpoints and the corresponding transactions waiting for completion notification from JM + private transient List<CheckpointAndXid> preparedXids = new ArrayList<>(); + // hanging XIDs - used for cleanup + // it's a list to support retries and scaling down + // possible transaction states: active, idle, prepared + // last element is the current xid + private transient Deque<Xid> hangingXids = new LinkedList<>(); + private transient Xid currentXid; + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param xaFacade {@link XaFacade} to manage XA transactions + */ + public JdbcXaSinkFunction( + String sql, + JdbcStatementBuilder<T> statementBuilder, + XaFacade xaFacade, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions options) { + this( + new JdbcBatchingOutputFormat<>( + xaFacade, + executionOptions, + context -> { + Preconditions.checkState( + !context.getExecutionConfig().isObjectReuseEnabled(), + "objects can not be reused with JDBC sink function"); + return JdbcBatchStatementExecutor.simple( + sql, statementBuilder, Function.identity()); + }, + JdbcBatchingOutputFormat.RecordExtractor.identity()), + xaFacade, + XidGenerator.semanticXidGenerator(), + new XaSinkStateHandlerImpl(), + options, + new XaGroupOpsImpl(xaFacade)); + } + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param format {@link JdbcBatchingOutputFormat} to write records with + * @param xaFacade {@link XaFacade} to manage XA transactions + * @param xidGenerator {@link XidGenerator} to generate new transaction ids + */ + public JdbcXaSinkFunction( + JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format, + XaFacade xaFacade, + XidGenerator xidGenerator, + XaSinkStateHandler stateHandler, + JdbcExactlyOnceOptions options, + XaGroupOps xaGroupOps) { + this.xaFacade = Preconditions.checkNotNull(xaFacade); + this.xidGenerator = Preconditions.checkNotNull(xidGenerator); + this.format = Preconditions.checkNotNull(format); + this.stateHandler = Preconditions.checkNotNull(stateHandler); + this.options = Preconditions.checkNotNull(options); + this.xaGroupOps = xaGroupOps; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + JdbcXaSinkFunctionState state = stateHandler.load(context); + hangingXids = new LinkedList<>(state.getHanging()); + preparedXids = new ArrayList<>(state.getPrepared()); + LOG.info( + "initialized state: prepared xids: {}, hanging xids: {}", + preparedXids.size(), + hangingXids.size()); + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + xidGenerator.open(); + xaFacade.open(); + format.setRuntimeContext(getRuntimeContext()); + format.open( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks()); + hangingXids = new LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry()); + commitUpToCheckpoint(Optional.empty()); Review comment: 1. Yes, failed Xds will be returned to `preparedXids` again 2. The job will restart from the latest completed checkpoint if any (decided by `CheckpointCoordinator` ) 3. Upon restart, `preparedXids` will be committed again (or erlier, if a checkpoint completion notification is received before restart) ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. Review comment: Let me clarify. Each element is sent to the external system exactly once (even across partitions). However, each partition commits independently (with a potentially big delay because of retries). Therefore, the data in the external system may be inconsistent during these periods from the application point of view. I believe this is a common behavior of all Flink exatly-once sinks. ########## File path: flink-connectors/flink-connector-jdbc/pom.xml ########## @@ -92,6 +100,19 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>javax.transaction</groupId> + <artifactId>javax.transaction-api</artifactId> + <version>1.3</version> + </dependency> + + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.4.200</version> + <scope>compile</scope> Review comment: Yes, you're right. ########## File path: flink-connectors/flink-connector-jdbc/pom.xml ########## @@ -92,6 +100,19 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>javax.transaction</groupId> + <artifactId>javax.transaction-api</artifactId> + <version>1.3</version> Review comment: It isn't needed actually, I removed it. Thanks for pointing out. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
