This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit edc198f4ef89e062ec4c27dcd6b85dfad649b78a
Author: Stephan Ewen <[email protected]>
AuthorDate: Sun Nov 29 22:43:22 2020 +0100

    [FLINK-20413][runtime] Sources return splits in "resetSubtask()", rather 
than in "subtaskFailed()"
---
 .../RecreateOnResetOperatorCoordinator.java        | 16 ++++++++++++----
 .../source/coordinator/SourceCoordinator.java      | 19 +++++++++++++------
 .../coordinator/SourceCoordinatorContext.java      |  7 ++++---
 .../source/coordinator/SplitAssignmentTracker.java | 20 ++++++++++++--------
 .../source/coordinator/SourceCoordinatorTest.java  |  1 +
 .../coordinator/SplitAssignmentTrackerTest.java    | 22 +++++++++++++++++++++-
 6 files changed, 63 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 346a40f..3c2e5cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
 import org.apache.flink.util.function.ThrowingRunnable;
 
@@ -47,7 +48,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
        private final long closingTimeoutMs;
        private final OperatorCoordinator.Context context;
        private DeferrableCoordinator coordinator;
-       private volatile boolean started;
+       private  boolean started;
        private volatile boolean closed;
 
        private RecreateOnResetOperatorCoordinator(
@@ -66,8 +67,11 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
 
        @Override
        public void start() throws Exception {
-               coordinator.start();
+               Preconditions.checkState(!started, "coordinator already 
started");
                started = true;
+               coordinator.applyCall(
+                       "start",
+                       OperatorCoordinator::start);
        }
 
        @Override
@@ -125,12 +129,16 @@ public class RecreateOnResetOperatorCoordinator 
implements OperatorCoordinator {
                // Close the old coordinator asynchronously in a separate 
closing thread.
                // The future will be completed when the old coordinator closes.
                CompletableFuture<Void> closingFuture = 
oldCoordinator.closeAsync(closingTimeoutMs);
-               // Create and
+
+               // Create and possibly start the coordinator and apply all 
meanwhile deferred calls
+               // capture the status whether the coordinator was started when 
this method was called
+               final boolean wasStarted = this.started;
+
                closingFuture.thenRun(() -> {
                        if (!closed) {
                                // The previous coordinator has closed. Create 
a new one.
                                
newCoordinator.createNewInternalCoordinator(context, provider);
-                               newCoordinator.resetAndStart(checkpointId, 
checkpointData, started);
+                               newCoordinator.resetAndStart(checkpointId, 
checkpointData, wasStarted);
                                newCoordinator.processPendingCalls();
                        }
                });
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 31135ae..9c9949b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -172,19 +172,26 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT> implements
        public void subtaskFailed(int subtaskId, @Nullable Throwable reason) {
                runInEventLoop(
                        () -> {
-                               LOG.info("Handling subtask {} failure of source 
{}.", subtaskId, operatorName);
-                               List<SplitT> splitsToAddBack = 
context.getAndRemoveUncheckpointedAssignment(subtaskId);
+                               LOG.info("Removing registered reader after 
failure for subtask {} of source {}.", subtaskId, operatorName);
                                context.unregisterSourceReader(subtaskId);
-                               LOG.debug("Adding {} back to the split 
enumerator of source {}.", splitsToAddBack, operatorName);
-                               enumerator.addSplitsBack(splitsToAddBack, 
subtaskId);
                        },
                        "handling subtask %d failure", subtaskId
                );
        }
 
        @Override
-       public void subtaskReset(int subtask, long checkpointId) {
-               // TODO - move the split reset logic here
+       public void subtaskReset(int subtaskId, long checkpointId) {
+               runInEventLoop(
+                       () -> {
+                               LOG.info("Recovering subtask {} to checkpoint 
{} for source {} to checkpoint.",
+                                               subtaskId, checkpointId, 
operatorName);
+
+                               final List<SplitT> splitsToAddBack = 
context.getAndRemoveUncheckpointedAssignment(subtaskId, checkpointId);
+                               LOG.debug("Adding splits back to the split 
enumerator of source {}: {}", operatorName, splitsToAddBack);
+                               enumerator.addSplitsBack(splitsToAddBack, 
subtaskId);
+                       },
+                       "handling subtask %d recovery to checkpoint %d", 
subtaskId, checkpointId
+               );
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 037b643..118f09f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -298,11 +298,12 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
        /**
         * Get the split to put back. This only happens when a source reader 
subtask has failed.
         *
-        * @param failedSubtaskId the failed subtask id.
+        * @param subtaskId the failed subtask id.
+        * @param restoredCheckpointId the checkpoint that the task is 
recovered to.
         * @return A list of splits that needs to be added back to the {@link 
SplitEnumerator}.
         */
-       List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
-               return 
assignmentTracker.getAndRemoveUncheckpointedAssignment(failedSubtaskId);
+       List<SplitT> getAndRemoveUncheckpointedAssignment(int subtaskId, long 
restoredCheckpointId) {
+               return 
assignmentTracker.getAndRemoveUncheckpointedAssignment(subtaskId, 
restoredCheckpointId);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
index 7985f7d..2041b7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
@@ -80,7 +80,6 @@ public class SplitAssignmentTracker<SplitT extends 
SourceSplit> {
         * @param in The ObjectInput that contains the state of the 
SplitAssignmentTracker.
         * @throws Exception when the state deserialization fails.
         */
-       @SuppressWarnings("unchecked")
        public void restoreState(SimpleVersionedSerializer<SplitT> 
splitSerializer, DataInputStream in) throws Exception {
                // Read the split assignments by checkpoint id.
                Map<Long, Map<Integer, LinkedHashSet<SplitT>>> 
deserializedAssignments =
@@ -114,15 +113,20 @@ public class SplitAssignmentTracker<SplitT extends 
SourceSplit> {
         * if those splits were never assigned. To handle this case, the 
coordinator needs to find those
         * splits and return them back to the SplitEnumerator for re-assignment.
         *
-        * @param failedSubtaskId the failed subtask id.
+        * @param subtaskId the subtask id of the reader that failed over.
+        * @param restoredCheckpointId the ID of the checkpoint that the reader 
was restored to.
         * @return A list of splits that needs to be added back to the {@link 
SplitEnumerator}.
         */
-       public List<SplitT> getAndRemoveUncheckpointedAssignment(int 
failedSubtaskId) {
-               List<SplitT> splits = new ArrayList<>();
-               assignmentsByCheckpointId.values().forEach(assignments -> {
-                       removeFromAssignment(failedSubtaskId, assignments, 
splits);
-               });
-               removeFromAssignment(failedSubtaskId, 
uncheckpointedAssignments, splits);
+       public List<SplitT> getAndRemoveUncheckpointedAssignment(int subtaskId, 
long restoredCheckpointId) {
+               final ArrayList<SplitT> splits = new ArrayList<>();
+
+               for (final Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> 
entry : assignmentsByCheckpointId.entrySet()) {
+                       if (entry.getKey() > restoredCheckpointId) {
+                               removeFromAssignment(subtaskId, 
entry.getValue(), splits);
+                       }
+               }
+
+               removeFromAssignment(subtaskId, uncheckpointedAssignments, 
splits);
                return splits;
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index aa1f079..738f848 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -201,6 +201,7 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
 
                // Fail reader 0.
                sourceCoordinator.subtaskFailed(0, null);
+               sourceCoordinator.subtaskReset(0, 99L); // checkpoint ID before 
the triggered checkpoints
 
                // check the state again.
                check(() -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
index 86ca76e..3a2359b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
@@ -30,6 +30,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -144,10 +145,29 @@ public class SplitAssignmentTrackerTest {
                takeSnapshot(tracker, checkpointId2);
 
                // Now assume subtask 0 has failed.
-               List<MockSourceSplit> splitsToPutBack = 
tracker.getAndRemoveUncheckpointedAssignment(0);
+               List<MockSourceSplit> splitsToPutBack = 
tracker.getAndRemoveUncheckpointedAssignment(0, checkpointId1 - 1);
                verifyAssignment(Arrays.asList("0", "3"), splitsToPutBack);
        }
 
+       @Test
+       public void testGetAndRemoveSplitsAfterSomeCheckpoint() throws 
Exception {
+               final long checkpointId1 = 100L;
+               final long checkpointId2 = 101L;
+               SplitAssignmentTracker<MockSourceSplit> tracker = new 
SplitAssignmentTracker<>();
+
+               // Assign some splits and take snapshot 1.
+               tracker.recordSplitAssignment(getSplitsAssignment(2, 0));
+               takeSnapshot(tracker, checkpointId1);
+
+               // Assign some more splits and take snapshot 2.
+               tracker.recordSplitAssignment(getSplitsAssignment(2, 3));
+               takeSnapshot(tracker, checkpointId2);
+
+               // Now assume subtask 0 has failed.
+               List<MockSourceSplit> splitsToPutBack = 
tracker.getAndRemoveUncheckpointedAssignment(0, checkpointId1);
+               verifyAssignment(Collections.singletonList("3"), 
splitsToPutBack);
+       }
+
        // ---------------------
 
        private byte[] takeSnapshot(SplitAssignmentTracker<MockSourceSplit> 
tracker, long checkpointId) throws Exception {

Reply via email to