wuchong commented on a change in pull request #10847: URL: https://github.com/apache/flink/pull/10847#discussion_r550422198
########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidImpl.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.transaction.xa.Xid; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; + +/** + * A simple {@link Xid} implementation that stores branch and global transaction identifiers as byte + * arrays. + */ +@Internal +final class XidImpl implements Xid, Serializable { + + private final int formatId; + @Nonnull private final byte[] globalTransactionId; + @Nonnull private final byte[] branchQualifier; + + XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) { + Preconditions.checkArgument(globalTransactionId.length <= Xid.MAXGTRIDSIZE); + Preconditions.checkArgument(branchQualifier.length <= Xid.MAXBQUALSIZE); + this.formatId = formatId; + this.globalTransactionId = Arrays.copyOf(globalTransactionId, globalTransactionId.length); + this.branchQualifier = Arrays.copyOf(branchQualifier, branchQualifier.length); + } + + @Override + public int getFormatId() { + return formatId; + } + + @Override + public byte[] getGlobalTransactionId() { + return globalTransactionId; + } + + @Override + public byte[] getBranchQualifier() { + return branchQualifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof XidImpl)) { + return false; + } + XidImpl xid = (XidImpl) o; + return formatId == xid.formatId + && Arrays.equals(globalTransactionId, xid.globalTransactionId) + && Arrays.equals(branchQualifier, xid.branchQualifier); + } + + @Override + public int hashCode() { + int result = Objects.hash(formatId); + result = 31 * result + Arrays.hashCode(globalTransactionId); + result = 31 * result + Arrays.hashCode(branchQualifier); + return result; + } + + @Override + public String toString() { + return formatId + ":" + bytesToHex(globalTransactionId) + ":" + bytesToHex(branchQualifier); Review comment: I think we can use `StringUtils.byteToHexString(bytes)` here to avoid duplicate implementation. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidImpl.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.transaction.xa.Xid; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; + +/** + * A simple {@link Xid} implementation that stores branch and global transaction identifiers as byte + * arrays. + */ +@Internal +final class XidImpl implements Xid, Serializable { Review comment: Add a `serialVersionUID` to the class? ########## File path: flink-connectors/flink-connector-jdbc/pom.xml ########## @@ -92,6 +100,19 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>javax.transaction</groupId> + <artifactId>javax.transaction-api</artifactId> + <version>1.3</version> Review comment: What classes do we depend on this dependency? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/CheckpointAndXidSerializer.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.Objects; + +/** {@link CheckpointAndXid} serializer. */ +@Internal +final class CheckpointAndXidSerializer extends TypeSerializer<CheckpointAndXid> { + + private static final TypeSerializerSnapshot<CheckpointAndXid> SNAPSHOT = + new SimpleTypeSerializerSnapshot<CheckpointAndXid>(CheckpointAndXidSerializer::new) { + private static final int VERSION = 1; + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + super.writeSnapshot(out); + out.writeInt(VERSION); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) + throws IOException { + super.readSnapshot(readVersion, in, classLoader); + in.readInt(); + } + }; + + private final TypeSerializer<Xid> xidSerializer = new XidSerializer(); + + @Override + public boolean isImmutableType() { + return xidSerializer.isImmutableType(); + } + + @Override + public TypeSerializer<CheckpointAndXid> duplicate() { + return this; + } + + @Override + public CheckpointAndXid createInstance() { + return CheckpointAndXid.createRestored(0L, 0, xidSerializer.createInstance()); + } + + @Override + public CheckpointAndXid copy(CheckpointAndXid from) { + return CheckpointAndXid.createRestored( + from.checkpointId, from.attempts, xidSerializer.copy(from.xid)); + } + + @Override + public CheckpointAndXid copy(CheckpointAndXid from, CheckpointAndXid reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(CheckpointAndXid record, DataOutputView target) throws IOException { + target.writeLong(record.checkpointId); + target.writeInt(record.attempts); Review comment: Why don't we serialize the restored flag? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; + private final XaSinkStateHandler stateHandler; + private final JdbcExactlyOnceOptions options; + + // checkpoints and the corresponding transactions waiting for completion notification from JM + private transient List<CheckpointAndXid> preparedXids = new ArrayList<>(); + // hanging XIDs - used for cleanup + // it's a list to support retries and scaling down + // possible transaction states: active, idle, prepared + // last element is the current xid + private transient Deque<Xid> hangingXids = new LinkedList<>(); + private transient Xid currentXid; + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param xaFacade {@link XaFacade} to manage XA transactions + */ + public JdbcXaSinkFunction( + String sql, + JdbcStatementBuilder<T> statementBuilder, + XaFacade xaFacade, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions options) { + this( + new JdbcBatchingOutputFormat<>( + xaFacade, + executionOptions, + context -> { + Preconditions.checkState( + !context.getExecutionConfig().isObjectReuseEnabled(), + "objects can not be reused with JDBC sink function"); + return JdbcBatchStatementExecutor.simple( + sql, statementBuilder, Function.identity()); + }, + JdbcBatchingOutputFormat.RecordExtractor.identity()), + xaFacade, + XidGenerator.semanticXidGenerator(), + new XaSinkStateHandlerImpl(), + options, + new XaGroupOpsImpl(xaFacade)); + } + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param format {@link JdbcBatchingOutputFormat} to write records with + * @param xaFacade {@link XaFacade} to manage XA transactions + * @param xidGenerator {@link XidGenerator} to generate new transaction ids + */ + public JdbcXaSinkFunction( + JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format, + XaFacade xaFacade, + XidGenerator xidGenerator, + XaSinkStateHandler stateHandler, + JdbcExactlyOnceOptions options, + XaGroupOps xaGroupOps) { + this.xaFacade = Preconditions.checkNotNull(xaFacade); + this.xidGenerator = Preconditions.checkNotNull(xidGenerator); + this.format = Preconditions.checkNotNull(format); + this.stateHandler = Preconditions.checkNotNull(stateHandler); + this.options = Preconditions.checkNotNull(options); + this.xaGroupOps = xaGroupOps; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + JdbcXaSinkFunctionState state = stateHandler.load(context); + hangingXids = new LinkedList<>(state.getHanging()); + preparedXids = new ArrayList<>(state.getPrepared()); + LOG.info( + "initialized state: prepared xids: {}, hanging xids: {}", + preparedXids.size(), + hangingXids.size()); + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + xidGenerator.open(); + xaFacade.open(); + format.setRuntimeContext(getRuntimeContext()); + format.open( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks()); + hangingXids = new LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry()); + commitUpToCheckpoint(Optional.empty()); Review comment: I have some questions: 1. Will this leave some retry Xids in the `preparedXids`? 2. If the job is failed, will the job restart the checkpoint from checkpoint: 0? 3. If yes, will the retry Xids in the `preparedXids` not be comitted for a long time? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; + private final XaSinkStateHandler stateHandler; + private final JdbcExactlyOnceOptions options; + + // checkpoints and the corresponding transactions waiting for completion notification from JM + private transient List<CheckpointAndXid> preparedXids = new ArrayList<>(); + // hanging XIDs - used for cleanup + // it's a list to support retries and scaling down + // possible transaction states: active, idle, prepared + // last element is the current xid + private transient Deque<Xid> hangingXids = new LinkedList<>(); + private transient Xid currentXid; + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param xaFacade {@link XaFacade} to manage XA transactions + */ + public JdbcXaSinkFunction( + String sql, + JdbcStatementBuilder<T> statementBuilder, + XaFacade xaFacade, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions options) { + this( + new JdbcBatchingOutputFormat<>( + xaFacade, + executionOptions, + context -> { + Preconditions.checkState( + !context.getExecutionConfig().isObjectReuseEnabled(), + "objects can not be reused with JDBC sink function"); + return JdbcBatchStatementExecutor.simple( + sql, statementBuilder, Function.identity()); + }, + JdbcBatchingOutputFormat.RecordExtractor.identity()), + xaFacade, + XidGenerator.semanticXidGenerator(), + new XaSinkStateHandlerImpl(), + options, + new XaGroupOpsImpl(xaFacade)); + } + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param format {@link JdbcBatchingOutputFormat} to write records with + * @param xaFacade {@link XaFacade} to manage XA transactions + * @param xidGenerator {@link XidGenerator} to generate new transaction ids + */ + public JdbcXaSinkFunction( + JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format, + XaFacade xaFacade, + XidGenerator xidGenerator, + XaSinkStateHandler stateHandler, + JdbcExactlyOnceOptions options, + XaGroupOps xaGroupOps) { + this.xaFacade = Preconditions.checkNotNull(xaFacade); + this.xidGenerator = Preconditions.checkNotNull(xidGenerator); + this.format = Preconditions.checkNotNull(format); + this.stateHandler = Preconditions.checkNotNull(stateHandler); + this.options = Preconditions.checkNotNull(options); + this.xaGroupOps = xaGroupOps; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + JdbcXaSinkFunctionState state = stateHandler.load(context); + hangingXids = new LinkedList<>(state.getHanging()); + preparedXids = new ArrayList<>(state.getPrepared()); + LOG.info( + "initialized state: prepared xids: {}, hanging xids: {}", + preparedXids.size(), + hangingXids.size()); + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + xidGenerator.open(); + xaFacade.open(); + format.setRuntimeContext(getRuntimeContext()); + format.open( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks()); + hangingXids = new LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry()); + commitUpToCheckpoint(Optional.empty()); + if (options.isDiscoverAndRollbackOnRecovery()) { + // todo: consider doing recover-rollback later (e.g. after the 1st checkpoint) + // when we are sure that all other subtasks started and committed any of their prepared + // transactions + // this would require to distinguish between this job Xids and other Xids + xaGroupOps.recoverAndRollback(); + } + beginTx(0L); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId()); + rollbackPreparedFromCheckpoint(context.getCheckpointId()); + prepareCurrentTx(context.getCheckpointId()); + beginTx(context.getCheckpointId() + 1); + stateHandler.store(of(preparedXids, hangingXids)); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + commitUpToCheckpoint(Optional.of(checkpointId)); + } + + @Override + public void invoke(T value, Context context) throws IOException { + Preconditions.checkState(currentXid != null, "current xid must not be null"); + if (LOG.isTraceEnabled()) { + LOG.trace("invoke, xid: {}, value: {}", currentXid, value); + } + format.writeRecord(value); + } + + @Override + public void close() throws Exception { + super.close(); + if (currentXid != null && xaFacade.isOpen()) { + try { + LOG.debug("remove current transaction before closing, xid={}", currentXid); + xaFacade.failOrRollback(currentXid); + } catch (Exception e) { + LOG.warn("unable to fail/rollback current transaction, xid={}", currentXid, e); + } + } + xaFacade.close(); + xidGenerator.close(); + // don't format.close(); as we don't want neither to flush nor to close connection here + currentXid = null; + hangingXids = null; + preparedXids = null; + } + + private void prepareCurrentTx(long checkpointId) throws IOException { + Preconditions.checkState(currentXid != null, "no current xid"); + Preconditions.checkState( + !hangingXids.isEmpty() && hangingXids.peek().equals(currentXid), + "inconsistent internal state"); + hangingXids.poll(); + format.flush(); + try { + xaFacade.endAndPrepare(currentXid); + preparedXids.add(CheckpointAndXid.createNew(checkpointId, currentXid)); + } catch (EmptyXaTransactionException e) { + LOG.info( + "empty XA transaction (skip), xid: {}, checkpoint {}", + currentXid, + checkpointId); + } + currentXid = null; + } + + /** @param checkpointId to associate with the new transaction. */ + private void beginTx(long checkpointId) { + Preconditions.checkState(currentXid == null, "currentXid not null"); + currentXid = xidGenerator.generateXid(getRuntimeContext(), checkpointId); + hangingXids.offer(currentXid); + xaFacade.start(currentXid); + } + + private void commitUpToCheckpoint(Optional<Long> checkpointInclusive) { + Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> splittedXids = + split(preparedXids, checkpointInclusive, true); + if (splittedXids.f0.isEmpty()) { + checkpointInclusive.ifPresent( + cp -> LOG.warn("nothing to commit up to checkpoint: {}", cp)); + } else { + preparedXids = splittedXids.f1; + preparedXids.addAll( + xaGroupOps + .commit( + splittedXids.f0, + options.isAllowOutOfOrderCommits(), + options.getMaxCommitAttempts()) + .getForRetry()); + } + } + + private void rollbackPreparedFromCheckpoint(long fromCheckpointInclusive) { + Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> splittedXids = + split(preparedXids, fromCheckpointInclusive, false); + if (splittedXids.f1.isEmpty()) { + return; + } + preparedXids = splittedXids.f0; + LOG.warn( + "state snapshots have already been taken for checkpoint >= {}, rolling back {} transactions", + fromCheckpointInclusive, + splittedXids.f1.size()); + xaGroupOps + .failOrRollback( + splittedXids.f1.stream() + .map(CheckpointAndXid::getXid) + .collect(Collectors.toList())) + .getForRetry() + .forEach(hangingXids::offerFirst); Review comment: Will this break the state consistent that `hangingXids.peek()` should equal to `currentXid`? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidSerializer.java ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import javax.transaction.xa.Xid; + +import java.io.IOException; + +/** {@link Xid} serializer. */ +@Internal +final class XidSerializer extends TypeSerializer<Xid> { + + private static final TypeSerializerSnapshot<Xid> SNAPSHOT = + new SimpleTypeSerializerSnapshot<Xid>(XidSerializer::new) { + private static final int VERSION = 1; + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + super.writeSnapshot(out); + out.writeInt(VERSION); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) + throws IOException { + super.readSnapshot(readVersion, in, classLoader); + in.readInt(); + } + }; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public TypeSerializer<Xid> duplicate() { + return this; + } + + @Override + public Xid createInstance() { + return new XidImpl(0, new byte[0], new byte[0]); + } + + @Override + public Xid copy(Xid from) { + return from; + } + + @Override + public Xid copy(Xid from, Xid reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(Xid xid, DataOutputView target) throws IOException { + target.writeInt(xid.getFormatId()); + writeBytesWithSize(target, xid.getGlobalTransactionId()); + writeBytesWithSize(target, xid.getBranchQualifier()); + } + + @Override + public Xid deserialize(DataInputView source) throws IOException { + return new XidImpl(source.readInt(), readBytesWithSize(source), readBytesWithSize(source)); + } + + private void writeBytesWithSize(DataOutputView target, byte[] bytes) throws IOException { + target.writeByte(bytes.length); + target.write(bytes, 0, bytes.length); + } + + private byte[] readBytesWithSize(DataInputView source) throws IOException { + byte len = source.readByte(); + byte[] bytes = new byte[len]; + source.read(bytes, 0, len); + return bytes; + } + + @Override + public Xid deserialize(Xid reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serialize(deserialize(source), target); + } + + @Override + public boolean equals(Object obj) { + return obj == this || obj instanceof XidSerializer; Review comment: Condition `obj == this` covered by subsequent condition `obj instanceof XidSerializer`, we can remove `obj == this` here. ########## File path: flink-connectors/flink-connector-jdbc/pom.xml ########## @@ -55,7 +55,15 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> - <scope>provided</scope> + <type>test-jar</type> Review comment: Could we add a `test` scope to this? And move this one and the following one under the `<!-- test dependencies -->`. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import javax.sql.XAConnection; +import javax.sql.XADataSource; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static javax.transaction.xa.XAException.XAER_NOTA; +import static javax.transaction.xa.XAException.XAER_RMFAIL; +import static javax.transaction.xa.XAException.XA_HEURCOM; +import static javax.transaction.xa.XAException.XA_HEURHAZ; +import static javax.transaction.xa.XAException.XA_HEURMIX; +import static javax.transaction.xa.XAException.XA_HEURRB; +import static javax.transaction.xa.XAException.XA_RBBASE; +import static javax.transaction.xa.XAException.XA_RBTIMEOUT; +import static javax.transaction.xa.XAException.XA_RBTRANSIENT; +import static javax.transaction.xa.XAResource.TMENDRSCAN; +import static javax.transaction.xa.XAResource.TMNOFLAGS; +import static javax.transaction.xa.XAResource.TMSTARTRSCAN; + +/** Default {@link XaFacade} implementation. */ +@NotThreadSafe +@Internal +class XaFacadeImpl implements XaFacade { + + private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImpl.class); + private static final Set<Integer> TRANSIENT_ERR_CODES = + new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL)); + private static final Set<Integer> HEUR_ERR_CODES = + new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, XA_HEURMIX)); + private static final int MAX_RECOVER_CALLS = 100; + + private final Supplier<XADataSource> dataSourceSupplier; + private final Integer timeoutSec; + private transient XAResource xaResource; + private transient Connection connection; + private transient XAConnection xaConnection; + + /** @return a non-serializable instance. */ + static XaFacadeImpl fromXaDataSource(XADataSource ds) { + return new XaFacadeImpl(() -> ds, empty()); + } + + XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Optional<Integer> timeoutSec) { + this.dataSourceSupplier = Preconditions.checkNotNull(dataSourceSupplier); + this.timeoutSec = timeoutSec.orElse(null); + } + + @Override + public void open() throws SQLException, XAException { + Preconditions.checkState(!isOpen(), "already connected"); + XADataSource ds = dataSourceSupplier.get(); + xaConnection = ds.getXAConnection(); + xaResource = xaConnection.getXAResource(); + if (timeoutSec != null) { + xaResource.setTransactionTimeout(timeoutSec); + } + connection = xaConnection.getConnection(); + connection.setReadOnly(false); + connection.setAutoCommit(false); + Preconditions.checkState(!connection.getAutoCommit()); + } + + @Override + public void close() throws SQLException { + if (connection != null) { + connection.close(); + connection = null; + } + if (xaConnection != null) { + xaConnection.close(); + xaConnection = null; + } + xaResource = null; + } + + @Override + public Connection getConnection() { + Preconditions.checkNotNull(connection); + return connection; + } + + @Override + public boolean isConnectionValid() { + return isOpen(); Review comment: Should use `connection.isValid` to check the validation? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidSerializer.java ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import javax.transaction.xa.Xid; + +import java.io.IOException; + +/** {@link Xid} serializer. */ +@Internal +final class XidSerializer extends TypeSerializer<Xid> { + + private static final TypeSerializerSnapshot<Xid> SNAPSHOT = + new SimpleTypeSerializerSnapshot<Xid>(XidSerializer::new) { + private static final int VERSION = 1; + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + super.writeSnapshot(out); + out.writeInt(VERSION); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) + throws IOException { + super.readSnapshot(readVersion, in, classLoader); + in.readInt(); + } + }; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public TypeSerializer<Xid> duplicate() { + return this; + } + + @Override + public Xid createInstance() { + return new XidImpl(0, new byte[0], new byte[0]); + } + + @Override + public Xid copy(Xid from) { + return from; + } + + @Override + public Xid copy(Xid from, Xid reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(Xid xid, DataOutputView target) throws IOException { + target.writeInt(xid.getFormatId()); + writeBytesWithSize(target, xid.getGlobalTransactionId()); + writeBytesWithSize(target, xid.getBranchQualifier()); + } + + @Override + public Xid deserialize(DataInputView source) throws IOException { + return new XidImpl(source.readInt(), readBytesWithSize(source), readBytesWithSize(source)); + } + + private void writeBytesWithSize(DataOutputView target, byte[] bytes) throws IOException { + target.writeByte(bytes.length); + target.write(bytes, 0, bytes.length); + } + + private byte[] readBytesWithSize(DataInputView source) throws IOException { + byte len = source.readByte(); + byte[] bytes = new byte[len]; + source.read(bytes, 0, len); + return bytes; + } + + @Override + public Xid deserialize(Xid reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serialize(deserialize(source), target); + } + + @Override + public boolean equals(Object obj) { + return obj == this || obj instanceof XidSerializer; + } + + @Override + public int hashCode() { + return 0; Review comment: Better to return `this.getClass().hashCode();`. ########## File path: flink-connectors/flink-connector-jdbc/pom.xml ########## @@ -92,6 +100,19 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>javax.transaction</groupId> + <artifactId>javax.transaction-api</artifactId> + <version>1.3</version> + </dependency> + + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.4.200</version> + <scope>compile</scope> Review comment: I think this is only used for testing, could we change the scope to `test`? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/CheckpointAndXidSerializer.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.Objects; + +/** {@link CheckpointAndXid} serializer. */ +@Internal +final class CheckpointAndXidSerializer extends TypeSerializer<CheckpointAndXid> { + + private static final TypeSerializerSnapshot<CheckpointAndXid> SNAPSHOT = + new SimpleTypeSerializerSnapshot<CheckpointAndXid>(CheckpointAndXidSerializer::new) { + private static final int VERSION = 1; + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + super.writeSnapshot(out); + out.writeInt(VERSION); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) + throws IOException { + super.readSnapshot(readVersion, in, classLoader); + in.readInt(); + } + }; + + private final TypeSerializer<Xid> xidSerializer = new XidSerializer(); + + @Override + public boolean isImmutableType() { + return xidSerializer.isImmutableType(); + } + + @Override + public TypeSerializer<CheckpointAndXid> duplicate() { + return this; + } + + @Override + public CheckpointAndXid createInstance() { + return CheckpointAndXid.createRestored(0L, 0, xidSerializer.createInstance()); + } + + @Override + public CheckpointAndXid copy(CheckpointAndXid from) { + return CheckpointAndXid.createRestored( + from.checkpointId, from.attempts, xidSerializer.copy(from.xid)); Review comment: This loses the `restored` information? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Optional; + +/** + * JDBC exactly once sink options. + * + * <p><b>maxCommitAttempts</b> - maximum number of commit attempts to make per transaction; must be + * > 0; state size is proportional to the product of max number of in-flight snapshots and this + * number. + * + * <p><b>allowOutOfOrderCommits</b> - If true, all prepared transactions will be attempted to commit + * regardless of any transient failures during this operation. This may lead to inconsistency. + * Default: false. + * + * <p><b>recoveredAndRollback</b> - whether to rollback prepared transactions known to XA RM on + * startup (after committing <b>known</b> transactions, i.e. restored from state). + * + * <p>NOTE that setting this parameter to true may: + * + * <ol> + * <li>interfere with other subtasks or applications (one subtask rolling back transactions + * prepared by the other one (and known to it)) + * <li>block when using with some non-MVCC databases, if there are ended-not-prepared transactions + * </ol> + * + * See also {@link org.apache.flink.connector.jdbc.xa.XaFacade#recover()} + */ +public class JdbcExactlyOnceOptions implements Serializable { Review comment: Add `@PublicEvolving` annotation? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import javax.sql.XAConnection; +import javax.sql.XADataSource; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static javax.transaction.xa.XAException.XAER_NOTA; +import static javax.transaction.xa.XAException.XAER_RMFAIL; +import static javax.transaction.xa.XAException.XA_HEURCOM; +import static javax.transaction.xa.XAException.XA_HEURHAZ; +import static javax.transaction.xa.XAException.XA_HEURMIX; +import static javax.transaction.xa.XAException.XA_HEURRB; +import static javax.transaction.xa.XAException.XA_RBBASE; +import static javax.transaction.xa.XAException.XA_RBTIMEOUT; +import static javax.transaction.xa.XAException.XA_RBTRANSIENT; +import static javax.transaction.xa.XAResource.TMENDRSCAN; +import static javax.transaction.xa.XAResource.TMNOFLAGS; +import static javax.transaction.xa.XAResource.TMSTARTRSCAN; + +/** Default {@link XaFacade} implementation. */ +@NotThreadSafe +@Internal +class XaFacadeImpl implements XaFacade { + + private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImpl.class); + private static final Set<Integer> TRANSIENT_ERR_CODES = + new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL)); + private static final Set<Integer> HEUR_ERR_CODES = + new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, XA_HEURMIX)); + private static final int MAX_RECOVER_CALLS = 100; + + private final Supplier<XADataSource> dataSourceSupplier; + private final Integer timeoutSec; + private transient XAResource xaResource; + private transient Connection connection; + private transient XAConnection xaConnection; + + /** @return a non-serializable instance. */ + static XaFacadeImpl fromXaDataSource(XADataSource ds) { + return new XaFacadeImpl(() -> ds, empty()); + } + + XaFacadeImpl(Supplier<XADataSource> dataSourceSupplier, Optional<Integer> timeoutSec) { + this.dataSourceSupplier = Preconditions.checkNotNull(dataSourceSupplier); + this.timeoutSec = timeoutSec.orElse(null); + } + + @Override + public void open() throws SQLException, XAException { + Preconditions.checkState(!isOpen(), "already connected"); + XADataSource ds = dataSourceSupplier.get(); + xaConnection = ds.getXAConnection(); + xaResource = xaConnection.getXAResource(); + if (timeoutSec != null) { + xaResource.setTransactionTimeout(timeoutSec); + } + connection = xaConnection.getConnection(); + connection.setReadOnly(false); + connection.setAutoCommit(false); + Preconditions.checkState(!connection.getAutoCommit()); + } + + @Override + public void close() throws SQLException { + if (connection != null) { + connection.close(); + connection = null; + } + if (xaConnection != null) { + xaConnection.close(); + xaConnection = null; + } + xaResource = null; + } + + @Override + public Connection getConnection() { + Preconditions.checkNotNull(connection); + return connection; + } + + @Override + public boolean isConnectionValid() { + return isOpen(); + } + + @Override + public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException { + if (!isOpen()) { + try { + open(); + } catch (XAException e) { + throw new SQLException(e); + } + } + return connection; + } + + @Override + public void closeConnection() { + try { + close(); + } catch (SQLException e) { + LOG.warn("Connection close failed.", e); + } + } + + @Override + public Connection reestablishConnection() { + throw new UnsupportedOperationException(); Review comment: I think this can be simply supported by calling `close()` and `open()` ? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; + private final XaSinkStateHandler stateHandler; + private final JdbcExactlyOnceOptions options; + + // checkpoints and the corresponding transactions waiting for completion notification from JM + private transient List<CheckpointAndXid> preparedXids = new ArrayList<>(); + // hanging XIDs - used for cleanup + // it's a list to support retries and scaling down + // possible transaction states: active, idle, prepared + // last element is the current xid + private transient Deque<Xid> hangingXids = new LinkedList<>(); + private transient Xid currentXid; + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param xaFacade {@link XaFacade} to manage XA transactions + */ + public JdbcXaSinkFunction( + String sql, + JdbcStatementBuilder<T> statementBuilder, + XaFacade xaFacade, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions options) { + this( + new JdbcBatchingOutputFormat<>( + xaFacade, + executionOptions, + context -> { + Preconditions.checkState( + !context.getExecutionConfig().isObjectReuseEnabled(), + "objects can not be reused with JDBC sink function"); + return JdbcBatchStatementExecutor.simple( + sql, statementBuilder, Function.identity()); + }, + JdbcBatchingOutputFormat.RecordExtractor.identity()), + xaFacade, + XidGenerator.semanticXidGenerator(), + new XaSinkStateHandlerImpl(), + options, + new XaGroupOpsImpl(xaFacade)); + } + + /** + * Creates a {@link JdbcXaSinkFunction}. + * + * <p>All parameters must be {@link java.io.Serializable serializable}. + * + * @param format {@link JdbcBatchingOutputFormat} to write records with + * @param xaFacade {@link XaFacade} to manage XA transactions + * @param xidGenerator {@link XidGenerator} to generate new transaction ids + */ + public JdbcXaSinkFunction( + JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format, + XaFacade xaFacade, + XidGenerator xidGenerator, + XaSinkStateHandler stateHandler, + JdbcExactlyOnceOptions options, + XaGroupOps xaGroupOps) { + this.xaFacade = Preconditions.checkNotNull(xaFacade); + this.xidGenerator = Preconditions.checkNotNull(xidGenerator); + this.format = Preconditions.checkNotNull(format); + this.stateHandler = Preconditions.checkNotNull(stateHandler); + this.options = Preconditions.checkNotNull(options); + this.xaGroupOps = xaGroupOps; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + JdbcXaSinkFunctionState state = stateHandler.load(context); + hangingXids = new LinkedList<>(state.getHanging()); Review comment: Does the `hangingXids` have to be a `Deque` to keep the order? If yes, I'm wondering the state recovery doesn't retain the order. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. + * + * <p>XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the + * following issues: + * + * <ol> + * <li>transactions may be abandoned, holding resources (e.g. locks, versions of rows) + * <li>abandoned transactions collide with the new transactions if their IDs repeat after recovery + * <li>commit requests may be repeated after job recovery, resulting in error responses and job + * failure + * </ol> + * + * The following table summarizes effects of failures during transaction state transitions and ways + * to mitigate them: + * + * <table border="1" style="width:100%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:15%;"> + * <col span="1" style="width:30%;"> + * <col span="1" style="width:40%;"> + * <thead> + * <tr> + * <th>Transition</th> + * <th>Methods</th> + * <th>What happens if transition lost</th> + * <th>Ways to mitigate</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>none > started, started > ended</td> + * <td>open(), snapshotState()</td> + * <td>Database eventually discards these transactions</td> + * <td><ol> + * <li>Use globally unique XIDs</li> + * <li>derive XID from: checkpoint id, subtask id, "job id", "run id" (see {@link SemanticXidGenerator}).</li> + * </ol></td> + * </tr> + * <tr> + * <td>ended > prepared</td> + * <td>snapshotState()</td> + * <td>Database keeps these transactions prepared forever ("in-doubt" state)</td> + * <td> + * <ol> + * <li>store ended transactions in state; rollback on job recovery (still doesn't cover all scenarios)</li> + * <li>call xa_recover() and xa_rollback() on job recovery; disabled by default in order not to affect transactions of other subtasks and apps</li> + * <li>setting transaction timeouts (not supported by most databases)</li> + * <li>manual recovery and rollback</li> + * </ol> + * </td> + * </tr> + * <tr> + * <td>prepared > committed</td> + * <td>open(), notifyCheckpointComplete()</td> + * <td> + * Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery. + * <p>Committing results in {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} error.</p> + * </td> + * <td> + * Distinguish between transactions created during this run and restored from state and ignore {@link javax.transaction.xa.XAException#XAER_NOTA XAER_NOTA} for the latter. + * </td> + * </tr> + * </tbody> + * </table> + * + * @since 1.11 + */ +@Internal +public class JdbcXaSinkFunction<T> extends AbstractRichFunction + implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class); + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final XidGenerator xidGenerator; + private final JdbcBatchingOutputFormat<T, T, JdbcBatchStatementExecutor<T>> format; Review comment: nit: would be better to call this `outputFormat` to avoid confusing with formats, e.g. csv, json. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Optional; + +/** + * JDBC exactly once sink options. + * + * <p><b>maxCommitAttempts</b> - maximum number of commit attempts to make per transaction; must be + * > 0; state size is proportional to the product of max number of in-flight snapshots and this + * number. + * + * <p><b>allowOutOfOrderCommits</b> - If true, all prepared transactions will be attempted to commit + * regardless of any transient failures during this operation. This may lead to inconsistency. + * Default: false. + * + * <p><b>recoveredAndRollback</b> - whether to rollback prepared transactions known to XA RM on + * startup (after committing <b>known</b> transactions, i.e. restored from state). + * + * <p>NOTE that setting this parameter to true may: + * + * <ol> + * <li>interfere with other subtasks or applications (one subtask rolling back transactions + * prepared by the other one (and known to it)) + * <li>block when using with some non-MVCC databases, if there are ended-not-prepared transactions + * </ol> + * + * See also {@link org.apache.flink.connector.jdbc.xa.XaFacade#recover()} + */ +public class JdbcExactlyOnceOptions implements Serializable { + + private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = false; + private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3; + private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false; + + private final boolean discoverAndRollbackOnRecovery; + private final int maxCommitAttempts; + private final boolean allowOutOfOrderCommits; + private final Integer timeoutSec; + + private JdbcExactlyOnceOptions( + boolean discoverAndRollbackOnRecovery, + int maxCommitAttempts, + boolean allowOutOfOrderCommits, + Optional<Integer> timeoutSec) { + this.discoverAndRollbackOnRecovery = discoverAndRollbackOnRecovery; + this.maxCommitAttempts = maxCommitAttempts; + this.allowOutOfOrderCommits = allowOutOfOrderCommits; + this.timeoutSec = timeoutSec.orElse(null); + Preconditions.checkArgument(this.maxCommitAttempts > 0, "maxCommitAttempts should be > 0"); + } + + public static JdbcExactlyOnceOptions defaults() { + return builder().build(); + } + + public boolean isDiscoverAndRollbackOnRecovery() { + return discoverAndRollbackOnRecovery; + } + + public boolean isAllowOutOfOrderCommits() { + return allowOutOfOrderCommits; + } + + public int getMaxCommitAttempts() { + return maxCommitAttempts; + } + + public Integer getTimeoutSec() { + return timeoutSec; + } + + public static JDBCExactlyOnceOptionsBuilder builder() { + return new JDBCExactlyOnceOptionsBuilder(); + } + + /** JDBCExactlyOnceOptionsBuilder. */ + public static class JDBCExactlyOnceOptionsBuilder { + private boolean recoveredAndRollback = DEFAULT_RECOVERED_AND_ROLLBACK; + private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS; + private boolean allowOutOfOrderCommits = DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS; + private Optional<Integer> timeoutSec = Optional.empty(); + + public JDBCExactlyOnceOptionsBuilder withRecoveredAndRollback( Review comment: Could you add Javadocs on the configuration mehtods about what are the configs used for? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.xa.XaFacade.EmptyXaTransactionException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunctionState.of; + +/** + * JDBC sink function that uses XA transactions to provide exactly once guarantees. That is, if a + * checkpoint succeeds then all records emitted during it are committed in the database, and rolled + * back otherwise. + * + * <p>Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, + * consistency is only guaranteed within partitions. Review comment: I'm a little confused about this. Does this mean we can't provide exactly once guarantee across partititons? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
