This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54dec76c87a927b9f95ded06fd8e785e3987bdbc
Author: Roman Khachatryan <[email protected]>
AuthorDate: Sun Apr 18 17:20:41 2021 +0200

    [FLINK-22239][jdbc] Rollback XA transactions on recovery
    
    Leaving transactions not rolled back may lead to new
    transactions being blocked by the old ones.
---
 .../connector/jdbc/JdbcExactlyOnceOptions.java     |   8 +-
 .../connector/jdbc/xa/JdbcXaSinkFunction.java      |  10 +-
 .../connector/jdbc/xa/SemanticXidGenerator.java    |  24 ++
 .../apache/flink/connector/jdbc/xa/XaGroupOps.java |   3 +-
 .../flink/connector/jdbc/xa/XaGroupOpsImpl.java    |  13 +-
 .../flink/connector/jdbc/xa/XidGenerator.java      |   3 +
 .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java |  13 +-
 .../connector/jdbc/xa/JdbcXaSinkTestBase.java      | 286 +++++++++++----------
 .../jdbc/xa/SemanticXidGeneratorTest.java          |   5 +-
 9 files changed, 209 insertions(+), 156 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
index 7879c8a..7ba4eb5 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.java
@@ -50,7 +50,7 @@ import java.util.Optional;
 @PublicEvolving
 public class JdbcExactlyOnceOptions implements Serializable {
 
-    private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = false;
+    private static final boolean DEFAULT_RECOVERED_AND_ROLLBACK = true;
     private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
     private static final boolean DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS = false;
 
@@ -102,7 +102,11 @@ public class JdbcExactlyOnceOptions implements 
Serializable {
         private boolean allowOutOfOrderCommits = 
DEFAULT_ALLOW_OUT_OF_ORDER_COMMITS;
         private Optional<Integer> timeoutSec = Optional.empty();
 
-        /** Toggle discovery and rollback of transactions upon recovery. */
+        /**
+         * Toggle discovery and rollback of prepared transactions upon 
recovery to prevent new
+         * transactions from being blocked by the older ones. Each subtask 
rollbacks its own
+         * transaction. This flag must be disabled when rescaling to prevent 
data loss.
+         */
         public JDBCExactlyOnceOptionsBuilder withRecoveredAndRollback(
                 boolean recoveredAndRollback) {
             this.recoveredAndRollback = recoveredAndRollback;
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
index 17ad730..1deda32 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
@@ -224,11 +224,11 @@ public class JdbcXaSinkFunction<T> extends 
AbstractRichFunction
         hangingXids = new 
LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry());
         commitUpToCheckpoint(Optional.empty());
         if (options.isDiscoverAndRollbackOnRecovery()) {
-            // todo: consider doing recover-rollback later (e.g. after the 1st 
checkpoint)
-            // when we are sure that all other subtasks started and committed 
any of their prepared
-            // transactions
-            // this would require to distinguish between this job Xids and 
other Xids
-            xaGroupOps.recoverAndRollback();
+            // Pending transactions which are not included into the checkpoint 
might hold locks and
+            // should be rolled back. However, rolling back ALL transactions 
can cause data loss. So
+            // each subtask first commits transactions from its state and then 
rolls back discovered
+            // transactions if they belong to it.
+            xaGroupOps.recoverAndRollback(getRuntimeContext(), xidGenerator);
         }
         beginTx(0L);
         outputFormat.setRuntimeContext(getRuntimeContext());
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
index 32d42c9..535d401 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/SemanticXidGenerator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import javax.transaction.xa.Xid;
 
 import java.security.SecureRandom;
+import java.util.Arrays;
 
 /**
  * Generates {@link Xid} from:
@@ -71,6 +72,29 @@ class SemanticXidGenerator implements XidGenerator {
         return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer);
     }
 
+    @Override
+    public boolean belongsToSubtask(Xid xid, RuntimeContext ctx) {
+        if (xid.getFormatId() != FORMAT_ID) {
+            return false;
+        }
+        int subtaskIndex = readNumber(xid.getGlobalTransactionId(), 
JobID.SIZE, Integer.BYTES);
+        if (subtaskIndex != ctx.getIndexOfThisSubtask()
+                && subtaskIndex <= ctx.getNumberOfParallelSubtasks() - 1) {
+            return false;
+        }
+        byte[] jobIdBytes = new byte[JobID.SIZE];
+        System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0, 
JobID.SIZE);
+        return Arrays.equals(jobIdBytes, ctx.getJobId().getBytes());
+    }
+
+    private static int readNumber(byte[] bytes, int offset, int numBytes) {
+        int result = 0;
+        for (int i = 0; i < numBytes; i++) {
+            result |= (bytes[offset + i] & 0xff) << Byte.SIZE * i;
+        }
+        return result;
+    }
+
     private static void writeNumber(long number, int numBytes, byte[] dst, int 
dstOffset) {
         for (int i = dstOffset; i < dstOffset + numBytes; i++) {
             dst[i] = (byte) number;
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java
index f6cef98..ad20382 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.jdbc.xa;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.transaction.xa.Xid;
@@ -36,7 +37,7 @@ interface XaGroupOps extends Serializable {
 
     GroupXaOperationResult<Xid> failOrRollback(Collection<Xid> xids);
 
-    void recoverAndRollback();
+    void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator 
xidGenerator);
 
     class GroupXaOperationResult<T> {
         private final List<T> succeeded = new ArrayList<>();
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
index c9159b8..017604f 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.jdbc.xa;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.connector.jdbc.xa.XaFacade.TransientXaException;
 
 import org.slf4j.Logger;
@@ -104,17 +105,19 @@ class XaGroupOpsImpl implements XaGroupOps {
     }
 
     @Override
-    public void recoverAndRollback() {
+    public void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator 
xidGenerator) {
         Collection<Xid> recovered = xaFacade.recover();
         if (recovered.isEmpty()) {
             return;
         }
         LOG.warn("rollback {} recovered transactions", recovered.size());
         for (Xid xid : recovered) {
-            try {
-                xaFacade.rollback(xid);
-            } catch (Exception e) {
-                LOG.info("unable to rollback recovered transaction, xid={}", 
xid, e);
+            if (xidGenerator.belongsToSubtask(xid, runtimeContext)) {
+                try {
+                    xaFacade.rollback(xid);
+                } catch (Exception e) {
+                    LOG.info("unable to rollback recovered transaction, 
xid={}", xid, e);
+                }
             }
         }
     }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java
index 253a036..17b21b3 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XidGenerator.java
@@ -46,6 +46,9 @@ public interface XidGenerator extends Serializable, 
AutoCloseable {
 
     default void open() {}
 
+    /** @return true if the provided transaction belongs to this subtask */
+    boolean belongsToSubtask(Xid xid, RuntimeContext ctx);
+
     @Override
     default void close() {}
 
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
index 0822c86..b07ed23 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
@@ -128,7 +129,17 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
 
     private static XidGenerator getXidGenerator() {
         final AtomicInteger txCounter = new AtomicInteger();
-        return (x, y) -> new TestXid(txCounter.incrementAndGet(), 0, 0);
+        return new XidGenerator() {
+            @Override
+            public Xid generateXid(RuntimeContext runtimeContext, long 
checkpointId) {
+                return new TestXid(txCounter.incrementAndGet(), 0, 0);
+            }
+
+            @Override
+            public boolean belongsToSubtask(Xid xid, RuntimeContext ctx) {
+                return false;
+            }
+        };
     }
 
     private static String getSnapshotPath(MigrationVersion version) {
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index 4086458..daad0ca 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -164,147 +164,151 @@ public abstract class JdbcXaSinkTestBase extends 
JdbcTestBase {
         return sink;
     }
 
-    static final RuntimeContext TEST_RUNTIME_CONTEXT =
-            new RuntimeContext() {
-                @Override
-                public JobID getJobId() {
-                    return new JobID();
-                }
-
-                @Override
-                public String getTaskName() {
-                    return "test";
-                }
-
-                @Override
-                public MetricGroup getMetricGroup() {
-                    return null;
-                }
-
-                @Override
-                public int getNumberOfParallelSubtasks() {
-                    return 1;
-                }
-
-                @Override
-                public int getMaxNumberOfParallelSubtasks() {
-                    return 1;
-                }
-
-                @Override
-                public int getIndexOfThisSubtask() {
-                    return 0;
-                }
-
-                @Override
-                public int getAttemptNumber() {
-                    return 0;
-                }
-
-                @Override
-                public String getTaskNameWithSubtasks() {
-                    return "test";
-                }
-
-                @Override
-                public ExecutionConfig getExecutionConfig() {
-                    return null;
-                }
-
-                @Override
-                public ClassLoader getUserCodeClassLoader() {
-                    return null;
-                }
-
-                @Override
-                public <V, A extends Serializable> void addAccumulator(
-                        String name, Accumulator<V, A> accumulator) {}
-
-                @Override
-                public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
-                    return null;
-                }
-
-                @Override
-                public void registerUserCodeClassLoaderReleaseHookIfAbsent(
-                        String releaseHookName, Runnable releaseHook) {
-                    throw new UnsupportedOperationException();
-                }
-
-                @Override
-                public IntCounter getIntCounter(String name) {
-                    return null;
-                }
-
-                @Override
-                public LongCounter getLongCounter(String name) {
-                    return null;
-                }
-
-                @Override
-                public DoubleCounter getDoubleCounter(String name) {
-                    return null;
-                }
-
-                @Override
-                public Histogram getHistogram(String name) {
-                    return null;
-                }
-
-                @Override
-                public boolean hasBroadcastVariable(String name) {
-                    return false;
-                }
-
-                @Override
-                public <RT> List<RT> getBroadcastVariable(String name) {
-                    return null;
-                }
-
-                @Override
-                public <T, C> C getBroadcastVariableWithInitializer(
-                        String name, BroadcastVariableInitializer<T, C> 
initializer) {
-                    return null;
-                }
-
-                @Override
-                public DistributedCache getDistributedCache() {
-                    return null;
-                }
-
-                @Override
-                public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
-                    return null;
-                }
-
-                @Override
-                public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
-                    return null;
-                }
-
-                @Override
-                public <T> ReducingState<T> getReducingState(
-                        ReducingStateDescriptor<T> stateProperties) {
-                    return null;
-                }
-
-                @Override
-                public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(
-                        AggregatingStateDescriptor<IN, ACC, OUT> 
stateProperties) {
-                    return null;
-                }
-
-                @Override
-                public Set<ExternalResourceInfo> 
getExternalResourceInfos(String resourceName) {
-                    throw new UnsupportedOperationException();
-                }
-
-                @Override
-                public <UK, UV> MapState<UK, UV> getMapState(
-                        MapStateDescriptor<UK, UV> stateProperties) {
-                    return null;
-                }
-            };
+    static final RuntimeContext TEST_RUNTIME_CONTEXT = getRuntimeContext(new 
JobID());
+
+    static RuntimeContext getRuntimeContext(final JobID jobID) {
+        return new RuntimeContext() {
+
+            @Override
+            public JobID getJobId() {
+                return jobID;
+            }
+
+            @Override
+            public String getTaskName() {
+                return "test";
+            }
+
+            @Override
+            public MetricGroup getMetricGroup() {
+                return null;
+            }
+
+            @Override
+            public int getNumberOfParallelSubtasks() {
+                return 1;
+            }
+
+            @Override
+            public int getMaxNumberOfParallelSubtasks() {
+                return 1;
+            }
+
+            @Override
+            public int getIndexOfThisSubtask() {
+                return 0;
+            }
+
+            @Override
+            public int getAttemptNumber() {
+                return 0;
+            }
+
+            @Override
+            public String getTaskNameWithSubtasks() {
+                return "test";
+            }
+
+            @Override
+            public ExecutionConfig getExecutionConfig() {
+                return null;
+            }
+
+            @Override
+            public ClassLoader getUserCodeClassLoader() {
+                return null;
+            }
+
+            @Override
+            public <V, A extends Serializable> void addAccumulator(
+                    String name, Accumulator<V, A> accumulator) {}
+
+            @Override
+            public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
+                return null;
+            }
+
+            @Override
+            public void registerUserCodeClassLoaderReleaseHookIfAbsent(
+                    String releaseHookName, Runnable releaseHook) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public IntCounter getIntCounter(String name) {
+                return null;
+            }
+
+            @Override
+            public LongCounter getLongCounter(String name) {
+                return null;
+            }
+
+            @Override
+            public DoubleCounter getDoubleCounter(String name) {
+                return null;
+            }
+
+            @Override
+            public Histogram getHistogram(String name) {
+                return null;
+            }
+
+            @Override
+            public boolean hasBroadcastVariable(String name) {
+                return false;
+            }
+
+            @Override
+            public <RT> List<RT> getBroadcastVariable(String name) {
+                return null;
+            }
+
+            @Override
+            public <T, C> C getBroadcastVariableWithInitializer(
+                    String name, BroadcastVariableInitializer<T, C> 
initializer) {
+                return null;
+            }
+
+            @Override
+            public DistributedCache getDistributedCache() {
+                return null;
+            }
+
+            @Override
+            public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
+                return null;
+            }
+
+            @Override
+            public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
+                return null;
+            }
+
+            @Override
+            public <T> ReducingState<T> getReducingState(
+                    ReducingStateDescriptor<T> stateProperties) {
+                return null;
+            }
+
+            @Override
+            public <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(
+                    AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+                return null;
+            }
+
+            @Override
+            public Set<ExternalResourceInfo> getExternalResourceInfos(String 
resourceName) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public <UK, UV> MapState<UK, UV> getMapState(
+                    MapStateDescriptor<UK, UV> stateProperties) {
+                return null;
+            }
+        };
+    }
 
     static final SinkFunction.Context TEST_SINK_CONTEXT =
             new SinkFunction.Context() {
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
index 097cba6..bfda5d8 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
+import org.apache.flink.api.common.JobID;
+
 import org.junit.Test;
 
 import javax.transaction.xa.Xid;
@@ -46,7 +48,8 @@ public class SemanticXidGeneratorTest {
         checkUniqueness(
                 unused -> {
                     generator.open();
-                    return generator.generateXid(TEST_RUNTIME_CONTEXT, 
checkpointId);
+                    return generator.generateXid(
+                            JdbcXaSinkTestBase.getRuntimeContext(new JobID()), 
checkpointId);
                 });
     }
 

Reply via email to