This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 54dec76c87a927b9f95ded06fd8e785e3987bdbc Author: Roman Khachatryan <[email protected]> AuthorDate: Sun Apr 18 17:20:41 2021 +0200 [FLINK-22239][jdbc] Rollback XA transactions on recovery Leaving transactions not rolled back may lead to new transactions being blocked by the old ones. --- .../connector/jdbc/JdbcExactlyOnceOptions.java | 8 +- .../connector/jdbc/xa/JdbcXaSinkFunction.java | 10 +- .../connector/jdbc/xa/SemanticXidGenerator.java | 24 ++ .../apache/flink/connector/jdbc/xa/XaGroupOps.java | 3 +- .../flink/connector/jdbc/xa/XaGroupOpsImpl.java | 13 +- .../flink/connector/jdbc/xa/XidGenerator.java | 3 + .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 13 +- .../connector/jdbc/xa/JdbcXaSinkTestBase.java | 286 +++++++++++---------- .../jdbc/xa/SemanticXidGeneratorTest.java | 5 +- 9 files changed, 209 insertions(+), 156 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java index 7879c8a..7ba4eb5 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java @@ -50,7 +50,7 @@ import java.util.Optional; @PublicEvolving public class JdbcExactlyOnceOptions implements Serializable { - private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = false; + private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = true; private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3; private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false; @@ -102,7 +102,11 @@ public class JdbcExactlyOnceOptions implements Serializable { private boolean allowOutOfOrderCommits = DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS; private Optional<Integer> timeoutSec = Optional.empty(); - /** Toggle discovery and rollback of transactions upon recovery. */ + /** + * Toggle discovery and rollback of prepared transactions upon recovery to prevent new + * transactions from being blocked by the older ones. Each subtask rollbacks its own + * transaction. This flag must be disabled when rescaling to prevent data loss. + */ public JDBCExactlyOnceOptionsBuilder withRecoveredAndRollback( boolean recoveredAndRollback) { this.recoveredAndRollback = recoveredAndRollback; diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java index 17ad730..1deda32 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java @@ -224,11 +224,11 @@ public class JdbcXaSinkFunction<T> extends AbstractRichFunction hangingXids = new LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry()); commitUpToCheckpoint(Optional.empty()); if (options.isDiscoverAndRollbackOnRecovery()) { - // todo: consider doing recover-rollback later (e.g. after the 1st checkpoint) - // when we are sure that all other subtasks started and committed any of their prepared - // transactions - // this would require to distinguish between this job Xids and other Xids - xaGroupOps.recoverAndRollback(); + // Pending transactions which are not included into the checkpoint might hold locks and + // should be rolled back. However, rolling back ALL transactions can cause data loss. So + // each subtask first commits transactions from its state and then rolls back discovered + // transactions if they belong to it. + xaGroupOps.recoverAndRollback(getRuntimeContext(), xidGenerator); } beginTx(0L); outputFormat.setRuntimeContext(getRuntimeContext()); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java index 32d42c9..535d401 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import javax.transaction.xa.Xid; import java.security.SecureRandom; +import java.util.Arrays; /** * Generates {@link Xid} from: @@ -71,6 +72,29 @@ class SemanticXidGenerator implements XidGenerator { return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer); } + @Override + public boolean belongsToSubtask(Xid xid, RuntimeContext ctx) { + if (xid.getFormatId() != FORMAT_ID) { + return false; + } + int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JobID.SIZE, Integer.BYTES); + if (subtaskIndex != ctx.getIndexOfThisSubtask() + && subtaskIndex <= ctx.getNumberOfParallelSubtasks() - 1) { + return false; + } + byte[] jobIdBytes = new byte[JobID.SIZE]; + System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0, JobID.SIZE); + return Arrays.equals(jobIdBytes, ctx.getJobId().getBytes()); + } + + private static int readNumber(byte[] bytes, int offset, int numBytes) { + int result = 0; + for (int i = 0; i < numBytes; i++) { + result |= (bytes[offset + i] & 0xff) << Byte.SIZE * i; + } + return result; + } + private static void writeNumber(long number, int numBytes, byte[] dst, int dstOffset) { for (int i = dstOffset; i < dstOffset + numBytes; i++) { dst[i] = (byte) number; diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java index f6cef98..ad20382 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.xa; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.util.FlinkRuntimeException; import javax.transaction.xa.Xid; @@ -36,7 +37,7 @@ interface XaGroupOps extends Serializable { GroupXaOperationResult<Xid> failOrRollback(Collection<Xid> xids); - void recoverAndRollback(); + void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator xidGenerator); class GroupXaOperationResult<T> { private final List<T> succeeded = new ArrayList<>(); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java index c9159b8..017604f 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.xa; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.connector.jdbc.xa.XaFacade.TransientXaException; import org.slf4j.Logger; @@ -104,17 +105,19 @@ class XaGroupOpsImpl implements XaGroupOps { } @Override - public void recoverAndRollback() { + public void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator xidGenerator) { Collection<Xid> recovered = xaFacade.recover(); if (recovered.isEmpty()) { return; } LOG.warn("rollback {} recovered transactions", recovered.size()); for (Xid xid : recovered) { - try { - xaFacade.rollback(xid); - } catch (Exception e) { - LOG.info("unable to rollback recovered transaction, xid={}", xid, e); + if (xidGenerator.belongsToSubtask(xid, runtimeContext)) { + try { + xaFacade.rollback(xid); + } catch (Exception e) { + LOG.info("unable to rollback recovered transaction, xid={}", xid, e); + } } } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java index 253a036..17b21b3 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java @@ -46,6 +46,9 @@ public interface XidGenerator extends Serializable, AutoCloseable { default void open() {} + /** @return true if the provided transaction belongs to this subtask */ + boolean belongsToSubtask(Xid xid, RuntimeContext ctx); + @Override default void close() {} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java index 0822c86..b07ed23 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.jdbc.xa; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcTestBase; @@ -128,7 +129,17 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase { private static XidGenerator getXidGenerator() { final AtomicInteger txCounter = new AtomicInteger(); - return (x, y) -> new TestXid(txCounter.incrementAndGet(), 0, 0); + return new XidGenerator() { + @Override + public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) { + return new TestXid(txCounter.incrementAndGet(), 0, 0); + } + + @Override + public boolean belongsToSubtask(Xid xid, RuntimeContext ctx) { + return false; + } + }; } private static String getSnapshotPath(MigrationVersion version) { diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java index 4086458..daad0ca 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java @@ -164,147 +164,151 @@ public abstract class JdbcXaSinkTestBase extends JdbcTestBase { return sink; } - static final RuntimeContext TEST_RUNTIME_CONTEXT = - new RuntimeContext() { - @Override - public JobID getJobId() { - return new JobID(); - } - - @Override - public String getTaskName() { - return "test"; - } - - @Override - public MetricGroup getMetricGroup() { - return null; - } - - @Override - public int getNumberOfParallelSubtasks() { - return 1; - } - - @Override - public int getMaxNumberOfParallelSubtasks() { - return 1; - } - - @Override - public int getIndexOfThisSubtask() { - return 0; - } - - @Override - public int getAttemptNumber() { - return 0; - } - - @Override - public String getTaskNameWithSubtasks() { - return "test"; - } - - @Override - public ExecutionConfig getExecutionConfig() { - return null; - } - - @Override - public ClassLoader getUserCodeClassLoader() { - return null; - } - - @Override - public <V, A extends Serializable> void addAccumulator( - String name, Accumulator<V, A> accumulator) {} - - @Override - public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { - return null; - } - - @Override - public void registerUserCodeClassLoaderReleaseHookIfAbsent( - String releaseHookName, Runnable releaseHook) { - throw new UnsupportedOperationException(); - } - - @Override - public IntCounter getIntCounter(String name) { - return null; - } - - @Override - public LongCounter getLongCounter(String name) { - return null; - } - - @Override - public DoubleCounter getDoubleCounter(String name) { - return null; - } - - @Override - public Histogram getHistogram(String name) { - return null; - } - - @Override - public boolean hasBroadcastVariable(String name) { - return false; - } - - @Override - public <RT> List<RT> getBroadcastVariable(String name) { - return null; - } - - @Override - public <T, C> C getBroadcastVariableWithInitializer( - String name, BroadcastVariableInitializer<T, C> initializer) { - return null; - } - - @Override - public DistributedCache getDistributedCache() { - return null; - } - - @Override - public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { - return null; - } - - @Override - public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { - return null; - } - - @Override - public <T> ReducingState<T> getReducingState( - ReducingStateDescriptor<T> stateProperties) { - return null; - } - - @Override - public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState( - AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { - return null; - } - - @Override - public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) { - throw new UnsupportedOperationException(); - } - - @Override - public <UK, UV> MapState<UK, UV> getMapState( - MapStateDescriptor<UK, UV> stateProperties) { - return null; - } - }; + static final RuntimeContext TEST_RUNTIME_CONTEXT = getRuntimeContext(new JobID()); + + static RuntimeContext getRuntimeContext(final JobID jobID) { + return new RuntimeContext() { + + @Override + public JobID getJobId() { + return jobID; + } + + @Override + public String getTaskName() { + return "test"; + } + + @Override + public MetricGroup getMetricGroup() { + return null; + } + + @Override + public int getNumberOfParallelSubtasks() { + return 1; + } + + @Override + public int getMaxNumberOfParallelSubtasks() { + return 1; + } + + @Override + public int getIndexOfThisSubtask() { + return 0; + } + + @Override + public int getAttemptNumber() { + return 0; + } + + @Override + public String getTaskNameWithSubtasks() { + return "test"; + } + + @Override + public ExecutionConfig getExecutionConfig() { + return null; + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return null; + } + + @Override + public <V, A extends Serializable> void addAccumulator( + String name, Accumulator<V, A> accumulator) {} + + @Override + public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { + return null; + } + + @Override + public void registerUserCodeClassLoaderReleaseHookIfAbsent( + String releaseHookName, Runnable releaseHook) { + throw new UnsupportedOperationException(); + } + + @Override + public IntCounter getIntCounter(String name) { + return null; + } + + @Override + public LongCounter getLongCounter(String name) { + return null; + } + + @Override + public DoubleCounter getDoubleCounter(String name) { + return null; + } + + @Override + public Histogram getHistogram(String name) { + return null; + } + + @Override + public boolean hasBroadcastVariable(String name) { + return false; + } + + @Override + public <RT> List<RT> getBroadcastVariable(String name) { + return null; + } + + @Override + public <T, C> C getBroadcastVariableWithInitializer( + String name, BroadcastVariableInitializer<T, C> initializer) { + return null; + } + + @Override + public DistributedCache getDistributedCache() { + return null; + } + + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + return null; + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + return null; + } + + @Override + public <T> ReducingState<T> getReducingState( + ReducingStateDescriptor<T> stateProperties) { + return null; + } + + @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState( + AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { + return null; + } + + @Override + public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) { + throw new UnsupportedOperationException(); + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState( + MapStateDescriptor<UK, UV> stateProperties) { + return null; + } + }; + } static final SinkFunction.Context TEST_SINK_CONTEXT = new SinkFunction.Context() { diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java index 097cba6..bfda5d8 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java @@ -17,6 +17,8 @@ package org.apache.flink.connector.jdbc.xa; +import org.apache.flink.api.common.JobID; + import org.junit.Test; import javax.transaction.xa.Xid; @@ -46,7 +48,8 @@ public class SemanticXidGeneratorTest { checkUniqueness( unused -> { generator.open(); - return generator.generateXid(TEST_RUNTIME_CONTEXT, checkpointId); + return generator.generateXid( + JdbcXaSinkTestBase.getRuntimeContext(new JobID()), checkpointId); }); }
