This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit edc198f4ef89e062ec4c27dcd6b85dfad649b78a Author: Stephan Ewen <[email protected]> AuthorDate: Sun Nov 29 22:43:22 2020 +0100 [FLINK-20413][runtime] Sources return splits in "resetSubtask()", rather than in "subtaskFailed()" --- .../RecreateOnResetOperatorCoordinator.java | 16 ++++++++++++---- .../source/coordinator/SourceCoordinator.java | 19 +++++++++++++------ .../coordinator/SourceCoordinatorContext.java | 7 ++++--- .../source/coordinator/SplitAssignmentTracker.java | 20 ++++++++++++-------- .../source/coordinator/SourceCoordinatorTest.java | 1 + .../coordinator/SplitAssignmentTrackerTest.java | 22 +++++++++++++++++++++- 6 files changed, 63 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 346a40f..3c2e5cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; @@ -47,7 +48,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { private final long closingTimeoutMs; private final OperatorCoordinator.Context context; private DeferrableCoordinator coordinator; - private volatile boolean started; + private boolean started; private volatile boolean closed; private RecreateOnResetOperatorCoordinator( @@ -66,8 +67,11 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { @Override public void start() throws Exception { - coordinator.start(); + Preconditions.checkState(!started, "coordinator already started"); started = true; + coordinator.applyCall( + "start", + OperatorCoordinator::start); } @Override @@ -125,12 +129,16 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { // Close the old coordinator asynchronously in a separate closing thread. // The future will be completed when the old coordinator closes. CompletableFuture<Void> closingFuture = oldCoordinator.closeAsync(closingTimeoutMs); - // Create and + + // Create and possibly start the coordinator and apply all meanwhile deferred calls + // capture the status whether the coordinator was started when this method was called + final boolean wasStarted = this.started; + closingFuture.thenRun(() -> { if (!closed) { // The previous coordinator has closed. Create a new one. newCoordinator.createNewInternalCoordinator(context, provider); - newCoordinator.resetAndStart(checkpointId, checkpointData, started); + newCoordinator.resetAndStart(checkpointId, checkpointData, wasStarted); newCoordinator.processPendingCalls(); } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 31135ae..9c9949b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -172,19 +172,26 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements public void subtaskFailed(int subtaskId, @Nullable Throwable reason) { runInEventLoop( () -> { - LOG.info("Handling subtask {} failure of source {}.", subtaskId, operatorName); - List<SplitT> splitsToAddBack = context.getAndRemoveUncheckpointedAssignment(subtaskId); + LOG.info("Removing registered reader after failure for subtask {} of source {}.", subtaskId, operatorName); context.unregisterSourceReader(subtaskId); - LOG.debug("Adding {} back to the split enumerator of source {}.", splitsToAddBack, operatorName); - enumerator.addSplitsBack(splitsToAddBack, subtaskId); }, "handling subtask %d failure", subtaskId ); } @Override - public void subtaskReset(int subtask, long checkpointId) { - // TODO - move the split reset logic here + public void subtaskReset(int subtaskId, long checkpointId) { + runInEventLoop( + () -> { + LOG.info("Recovering subtask {} to checkpoint {} for source {} to checkpoint.", + subtaskId, checkpointId, operatorName); + + final List<SplitT> splitsToAddBack = context.getAndRemoveUncheckpointedAssignment(subtaskId, checkpointId); + LOG.debug("Adding splits back to the split enumerator of source {}: {}", operatorName, splitsToAddBack); + enumerator.addSplitsBack(splitsToAddBack, subtaskId); + }, + "handling subtask %d recovery to checkpoint %d", subtaskId, checkpointId + ); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 037b643..118f09f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -298,11 +298,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> /** * Get the split to put back. This only happens when a source reader subtask has failed. * - * @param failedSubtaskId the failed subtask id. + * @param subtaskId the failed subtask id. + * @param restoredCheckpointId the checkpoint that the task is recovered to. * @return A list of splits that needs to be added back to the {@link SplitEnumerator}. */ - List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) { - return assignmentTracker.getAndRemoveUncheckpointedAssignment(failedSubtaskId); + List<SplitT> getAndRemoveUncheckpointedAssignment(int subtaskId, long restoredCheckpointId) { + return assignmentTracker.getAndRemoveUncheckpointedAssignment(subtaskId, restoredCheckpointId); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java index 7985f7d..2041b7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java @@ -80,7 +80,6 @@ public class SplitAssignmentTracker<SplitT extends SourceSplit> { * @param in The ObjectInput that contains the state of the SplitAssignmentTracker. * @throws Exception when the state deserialization fails. */ - @SuppressWarnings("unchecked") public void restoreState(SimpleVersionedSerializer<SplitT> splitSerializer, DataInputStream in) throws Exception { // Read the split assignments by checkpoint id. Map<Long, Map<Integer, LinkedHashSet<SplitT>>> deserializedAssignments = @@ -114,15 +113,20 @@ public class SplitAssignmentTracker<SplitT extends SourceSplit> { * if those splits were never assigned. To handle this case, the coordinator needs to find those * splits and return them back to the SplitEnumerator for re-assignment. * - * @param failedSubtaskId the failed subtask id. + * @param subtaskId the subtask id of the reader that failed over. + * @param restoredCheckpointId the ID of the checkpoint that the reader was restored to. * @return A list of splits that needs to be added back to the {@link SplitEnumerator}. */ - public List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) { - List<SplitT> splits = new ArrayList<>(); - assignmentsByCheckpointId.values().forEach(assignments -> { - removeFromAssignment(failedSubtaskId, assignments, splits); - }); - removeFromAssignment(failedSubtaskId, uncheckpointedAssignments, splits); + public List<SplitT> getAndRemoveUncheckpointedAssignment(int subtaskId, long restoredCheckpointId) { + final ArrayList<SplitT> splits = new ArrayList<>(); + + for (final Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> entry : assignmentsByCheckpointId.entrySet()) { + if (entry.getKey() > restoredCheckpointId) { + removeFromAssignment(subtaskId, entry.getValue(), splits); + } + } + + removeFromAssignment(subtaskId, uncheckpointedAssignments, splits); return splits; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index aa1f079..738f848 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -201,6 +201,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { // Fail reader 0. sourceCoordinator.subtaskFailed(0, null); + sourceCoordinator.subtaskReset(0, 99L); // checkpoint ID before the triggered checkpoints // check the state again. check(() -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java index 86ca76e..3a2359b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java @@ -30,6 +30,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -144,10 +145,29 @@ public class SplitAssignmentTrackerTest { takeSnapshot(tracker, checkpointId2); // Now assume subtask 0 has failed. - List<MockSourceSplit> splitsToPutBack = tracker.getAndRemoveUncheckpointedAssignment(0); + List<MockSourceSplit> splitsToPutBack = tracker.getAndRemoveUncheckpointedAssignment(0, checkpointId1 - 1); verifyAssignment(Arrays.asList("0", "3"), splitsToPutBack); } + @Test + public void testGetAndRemoveSplitsAfterSomeCheckpoint() throws Exception { + final long checkpointId1 = 100L; + final long checkpointId2 = 101L; + SplitAssignmentTracker<MockSourceSplit> tracker = new SplitAssignmentTracker<>(); + + // Assign some splits and take snapshot 1. + tracker.recordSplitAssignment(getSplitsAssignment(2, 0)); + takeSnapshot(tracker, checkpointId1); + + // Assign some more splits and take snapshot 2. + tracker.recordSplitAssignment(getSplitsAssignment(2, 3)); + takeSnapshot(tracker, checkpointId2); + + // Now assume subtask 0 has failed. + List<MockSourceSplit> splitsToPutBack = tracker.getAndRemoveUncheckpointedAssignment(0, checkpointId1); + verifyAssignment(Collections.singletonList("3"), splitsToPutBack); + } + // --------------------- private byte[] takeSnapshot(SplitAssignmentTracker<MockSourceSplit> tracker, long checkpointId) throws Exception {
