This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc870f66c9baf8ed0f36d75c27222a4a929428a0 Author: Stephan Ewen <[email protected]> AuthorDate: Fri Nov 27 17:44:10 2020 +0100 [FLINK-20396][checkpointing] Add a 'subtaskReset()' method to the OperatorCoordinator. This closes #14256 --- .../OperatorCoordinatorCheckpointContext.java | 11 +++ .../coordination/OperatorCoordinator.java | 16 +++- .../coordination/OperatorCoordinatorHolder.java | 6 ++ .../RecreateOnResetOperatorCoordinator.java | 7 ++ .../flink/runtime/scheduler/SchedulerBase.java | 81 ++++++++++++++++- .../source/coordinator/SourceCoordinator.java | 5 + .../CheckpointCoordinatorTestingUtils.java | 3 + .../CoordinatorEventsExactlyOnceITCase.java | 3 + .../coordination/MockOperatorCoordinator.java | 5 + .../OperatorCoordinatorHolderTest.java | 3 + .../OperatorCoordinatorSchedulerTest.java | 101 ++++++++++++++++++++- .../coordination/TestingOperatorCoordinator.java | 23 +++++ .../collect/CollectSinkOperatorCoordinator.java | 5 + 13 files changed, 262 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java index 8206d8b..fb7e205 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java @@ -66,4 +66,15 @@ public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, Chec * In both cases, the coordinator should reset to an empty (new) state. */ void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception; + + /** + * Called if a task is recovered as part of a <i>partial failover</i>, meaning a failover + * handled by the scheduler's failover strategy (by default recovering a pipelined region). + * The method is invoked for each subtask involved in that partial failover. + * + * <p>In contrast to this method, the {@link #resetToCheckpoint(long, byte[])} method is called in + * the case of a global failover, which is the case when the coordinator (JobManager) is + * recovered. + */ + void subtaskReset(int subtask, long checkpointId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index a74aa9f..efcc2fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -126,7 +126,9 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * All subtasks will also have been reset to the same checkpoint. * * <p>This method is called in the case of a <i>global failover</i> of the system, which means - * a failover of the coordinator (JobManager). + * a failover of the coordinator (JobManager). This method is not invoked on a <i>partial + * failover</i>; partial failovers call the {@link #subtaskReset(int, long)} method for the + * involved subtasks. * * <p>This method is expected to behave synchronously with respect to other method calls and calls * to {@code Context} methods. For example, Events being sent by the Coordinator after this method @@ -158,15 +160,21 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * Called when one of the subtasks of the task running the coordinated operator goes * through a failover (failure / recovery cycle). * - * <p>This method is called in case of a <i>partial failover</i> meaning a failover handled - * by the scheduler's failover strategy (by default recovering a pipelined region). + * <p>This method is called every time there is a failover of a subtasks, regardless of + * whether there it is a partial failover or a global failover. + */ + void subtaskFailed(int subtask, @Nullable Throwable reason); + + /** + * Called if a task is recovered as part of a <i>partial failover</i>, meaning a failover + * handled by the scheduler's failover strategy (by default recovering a pipelined region). * The method is invoked for each subtask involved in that partial failover. * * <p>In contrast to this method, the {@link #resetToCheckpoint(long, byte[])} method is called in * the case of a global failover, which is the case when the coordinator (JobManager) is * recovered. */ - void subtaskFailed(int subtask, @Nullable Throwable reason); + void subtaskReset(int subtask, long checkpointId); // ------------------------------------------------------------------------ // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 4f0caed..935e252 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -202,6 +202,12 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC } @Override + public void subtaskReset(int subtask, long checkpointId) { + mainThreadExecutor.assertRunningInMainThread(); + coordinator.subtaskReset(subtask, checkpointId); + } + + @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) { // unfortunately, this method does not run in the scheduler executor, but in the // checkpoint coordinator time thread. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 822ea5e..346a40f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -91,6 +91,13 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { } @Override + public void subtaskReset(int subtask, long checkpointId) { + coordinator.applyCall( + "subtaskReset", + c -> c.subtaskReset(subtask, checkpointId)); + } + + @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception { coordinator.applyCall( "checkpointCoordinator", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 1986136..4b442a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -100,6 +100,7 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.IntArrayList; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; @@ -120,6 +121,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -347,7 +349,17 @@ public abstract class SchedulerBase implements SchedulerNG { protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) throws Exception { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + // batch failover case - we only need to notify the OperatorCoordinators, + // not do any actual state restore + if (isGlobalRecovery) { + notifyCoordinatorsOfEmptyGlobalRestore(); + } else { + notifyCoordinatorsOfSubtaskRestore( + getInvolvedExecutionJobVerticesAndSubtasks(vertices), + OperatorCoordinator.NO_CHECKPOINT); + } return; } @@ -359,11 +371,60 @@ public abstract class SchedulerBase implements SchedulerNG { checkpointCoordinator.abortPendingCheckpoints( new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION)); - final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices); if (isGlobalRecovery) { + final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices); + + // a global restore restores all Job Vertices + assert jobVerticesToRestore.size() == getExecutionGraph().getAllVertices().size(); + checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true); + } else { - checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(jobVerticesToRestore); + final Map<ExecutionJobVertex, IntArrayList> subtasksToRestore = + getInvolvedExecutionJobVerticesAndSubtasks(vertices); + + final OptionalLong restoredCheckpointId = + checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(subtasksToRestore.keySet()); + + // Ideally, the Checkpoint Coordinator would call OperatorCoordinator.resetSubtask, but + // the Checkpoint Coordinator is not aware of subtasks in a local failover. It always + // assigns state to all subtasks, and for the subtask execution attempts that are still + // running (or not waiting to be deployed) the state assignment has simply no effect. + // Because of that, we need to do the "subtask restored" notification here. + // Once the Checkpoint Coordinator is properly aware of partial (region) recovery, + // this code should move into the Checkpoint Coordinator. + final long checkpointId = restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT); + notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId); + } + } + + private void notifyCoordinatorsOfSubtaskRestore( + final Map<ExecutionJobVertex, IntArrayList> restoredSubtasks, + final long checkpointId) { + + for (final Map.Entry<ExecutionJobVertex, IntArrayList> vertexSubtasks : restoredSubtasks.entrySet()) { + final ExecutionJobVertex jobVertex = vertexSubtasks.getKey(); + final IntArrayList subtasks = vertexSubtasks.getValue(); + + final Collection<OperatorCoordinatorHolder> coordinators = jobVertex.getOperatorCoordinators(); + if (coordinators.isEmpty()) { + continue; + } + + while (!subtasks.isEmpty()) { + final int subtask = subtasks.removeLast(); // this is how IntArrayList implements iterations + for (final OperatorCoordinatorHolder opCoordinator : coordinators) { + opCoordinator.subtaskReset(subtask, checkpointId); + } + } + } + } + + private void notifyCoordinatorsOfEmptyGlobalRestore() throws Exception { + for (final ExecutionJobVertex ejv : getExecutionGraph().getAllVertices().values()) { + for (final OperatorCoordinator coordinator : ejv.getOperatorCoordinators()) { + coordinator.resetToCheckpoint(OperatorCoordinator.NO_CHECKPOINT, null); + } } } @@ -378,6 +439,22 @@ public abstract class SchedulerBase implements SchedulerNG { return tasks; } + private Map<ExecutionJobVertex, IntArrayList> getInvolvedExecutionJobVerticesAndSubtasks( + final Set<ExecutionVertexID> executionVertices) { + + final HashMap<ExecutionJobVertex, IntArrayList> result = new HashMap<>(); + + for (ExecutionVertexID executionVertexID : executionVertices) { + final ExecutionVertex executionVertex = getExecutionVertex(executionVertexID); + final IntArrayList subtasks = result.computeIfAbsent( + executionVertex.getJobVertex(), + (key) -> new IntArrayList(32)); + subtasks.add(executionVertex.getParallelSubtaskIndex()); + } + + return result; + } + protected void transitionToScheduled(final List<ExecutionVertexID> verticesToDeploy) { verticesToDeploy.forEach(executionVertexId -> getExecutionVertex(executionVertexId) .getCurrentExecutionAttempt() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 45cb4f8..31135ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -183,6 +183,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } @Override + public void subtaskReset(int subtask, long checkpointId) { + // TODO - move the split reset logic here + } + + @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) { runInEventLoop( () -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 627aa07..9299e3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -848,6 +848,9 @@ public class CheckpointCoordinatorTestingUtils { public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {} @Override + public void subtaskReset(int subtask, long checkpointId) {} + + @Override public OperatorID operatorId() { return operatorID; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index 378b73c..dff084a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -289,6 +289,9 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { } @Override + public void subtaskReset(int subtask, long checkpointId) {} + + @Override public void resetToCheckpoint( final long checkpointId, @Nullable final byte[] checkpointData) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java index e86615a..6519444 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java @@ -49,6 +49,11 @@ public final class MockOperatorCoordinator implements OperatorCoordinator { } @Override + public void subtaskReset(int subtask, long checkpointId) { + throw new UnsupportedOperationException(); + } + + @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index ce135c6..ea9cb42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -560,6 +560,9 @@ public class OperatorCoordinatorHolderTest extends TestLogger { public void subtaskFailed(int subtask, @Nullable Throwable reason) {} @Override + public void subtaskReset(int subtask, long checkpointId) {} + + @Override public abstract void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception; @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index aa1abea..e23fe19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -72,6 +72,7 @@ import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith; import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertArrayEquals; @@ -335,7 +336,45 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { } @Test - public void testLocalFailureDoesNotResetToCheckpoint() throws Exception { + public void testGlobalFailoverDoesNotNotifyLocalRestore() throws Exception { + final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + takeCompleteCheckpoint(scheduler, coordinator, new byte[0]); + failGlobalAndRestart(scheduler, new TestException()); + + assertThat(coordinator.getRestoredTasks(), empty()); + } + + @Test + public void testLocalFailoverResetsTask() throws Exception { + final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + final long checkpointId = takeCompleteCheckpoint(scheduler, coordinator, new byte[0]); + failAndRestartTask(scheduler, 1); + + assertEquals(1, coordinator.getRestoredTasks().size()); + final TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0); + assertEquals(1, restoredTask.subtaskIndex); + assertEquals(checkpointId, restoredTask.checkpointId); + } + + @Test + public void testLocalFailoverBeforeCheckpointResetsTask() throws Exception { + final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + failAndRestartTask(scheduler, 1); + + assertEquals(1, coordinator.getRestoredTasks().size()); + final TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0); + assertEquals(1, restoredTask.subtaskIndex); + assertEquals(OperatorCoordinator.NO_CHECKPOINT, restoredTask.checkpointId); + } + + @Test + public void testLocalFailoverDoesNotResetToCheckpoint() throws Exception { final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); @@ -357,6 +396,55 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { } // ------------------------------------------------------------------------ + // tests for failover notifications in a batch setup (no checkpoints) + // ------------------------------------------------------------------------ + + @Test + public void testBatchGlobalFailureResetsToEmptyState() throws Exception { + final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + failGlobalAndRestart(scheduler, new TestException()); + + assertSame("coordinator should have null restored state", + TestingOperatorCoordinator.NULL_RESTORE_VALUE, coordinator.getLastRestoredCheckpointState()); + assertEquals(OperatorCoordinator.NO_CHECKPOINT, coordinator.getLastRestoredCheckpointId()); + } + + @Test + public void testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws Exception { + final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + failGlobalAndRestart(scheduler, new TestException()); + + assertThat(coordinator.getRestoredTasks(), empty()); + } + + @Test + public void testBatchLocalFailoverResetsTask() throws Exception { + final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + failAndRestartTask(scheduler, 1); + + assertEquals(1, coordinator.getRestoredTasks().size()); + final TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0); + assertEquals(1, restoredTask.subtaskIndex); + assertEquals(OperatorCoordinator.NO_CHECKPOINT, restoredTask.checkpointId); + } + + @Test + public void testBatchLocalFailoverDoesNotResetToCheckpoint() throws Exception { + final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + failAndRestartTask(scheduler, 0); + + assertNull("coordinator should not have a restored checkpoint", coordinator.getLastRestoredCheckpointState()); + } + + // ------------------------------------------------------------------------ // tests for REST request delivery // ------------------------------------------------------------------------ @@ -435,6 +523,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { return scheduler; } + private DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks() throws Exception { + final Consumer<JobGraph> noCheckpoints = (jobGraph) -> jobGraph.setSnapshotSettings(null); + final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, noCheckpoints, false); + + // guard test assumptions: this must set up a scheduler without checkpoints + assertNull(scheduler.getExecutionGraph().getCheckpointCoordinator()); + + scheduleAllTasksToRunning(scheduler); + return scheduler; + } + private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception { final DefaultScheduler scheduler = setupTestJobAndScheduler(provider); scheduleAllTasksToRunning(scheduler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java index 3011f58..9fbd312 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java @@ -40,6 +40,7 @@ class TestingOperatorCoordinator implements OperatorCoordinator { private final OperatorCoordinator.Context context; private final ArrayList<Integer> failedTasks = new ArrayList<>(); + private final ArrayList<SubtaskAndCheckpoint> restoredTasks = new ArrayList<>(); private final CountDownLatch blockOnCloseLatch; @@ -96,6 +97,11 @@ class TestingOperatorCoordinator implements OperatorCoordinator { } @Override + public void subtaskReset(int subtask, long checkpointId) { + restoredTasks.add(new SubtaskAndCheckpoint(subtask, checkpointId)); + } + + @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) { boolean added = triggeredCheckpoints.offer(result); assert added; // guard the test assumptions @@ -132,6 +138,10 @@ class TestingOperatorCoordinator implements OperatorCoordinator { return failedTasks; } + public List<SubtaskAndCheckpoint> getRestoredTasks() { + return restoredTasks; + } + @Nullable public byte[] getLastRestoredCheckpointState() { return lastRestoredCheckpointState; @@ -163,6 +173,19 @@ class TestingOperatorCoordinator implements OperatorCoordinator { } // ------------------------------------------------------------------------ + + public static final class SubtaskAndCheckpoint { + + public final int subtaskIndex; + public final long checkpointId; + + public SubtaskAndCheckpoint(int subtaskIndex, long checkpointId) { + this.subtaskIndex = subtaskIndex; + this.checkpointId = checkpointId; + } + } + + // ------------------------------------------------------------------------ // The provider for this coordinator implementation // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index bdce876..3c96c63 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -184,6 +184,11 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor } @Override + public void subtaskReset(int subtask, long checkpointId) { + // nothing to do here, connections are re-created lazily + } + + @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos);
