This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc870f66c9baf8ed0f36d75c27222a4a929428a0
Author: Stephan Ewen <[email protected]>
AuthorDate: Fri Nov 27 17:44:10 2020 +0100

    [FLINK-20396][checkpointing] Add a 'subtaskReset()' method to the 
OperatorCoordinator.
    
    This closes #14256
---
 .../OperatorCoordinatorCheckpointContext.java      |  11 +++
 .../coordination/OperatorCoordinator.java          |  16 +++-
 .../coordination/OperatorCoordinatorHolder.java    |   6 ++
 .../RecreateOnResetOperatorCoordinator.java        |   7 ++
 .../flink/runtime/scheduler/SchedulerBase.java     |  81 ++++++++++++++++-
 .../source/coordinator/SourceCoordinator.java      |   5 +
 .../CheckpointCoordinatorTestingUtils.java         |   3 +
 .../CoordinatorEventsExactlyOnceITCase.java        |   3 +
 .../coordination/MockOperatorCoordinator.java      |   5 +
 .../OperatorCoordinatorHolderTest.java             |   3 +
 .../OperatorCoordinatorSchedulerTest.java          | 101 ++++++++++++++++++++-
 .../coordination/TestingOperatorCoordinator.java   |  23 +++++
 .../collect/CollectSinkOperatorCoordinator.java    |   5 +
 13 files changed, 262 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 8206d8b..fb7e205 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -66,4 +66,15 @@ public interface OperatorCoordinatorCheckpointContext 
extends OperatorInfo, Chec
         * In both cases, the coordinator should reset to an empty (new) state.
         */
        void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) throws Exception;
+
+       /**
+        * Called if a task is recovered as part of a <i>partial failover</i>, 
meaning a failover
+        * handled by the scheduler's failover strategy (by default recovering 
a pipelined region).
+        * The method is invoked for each subtask involved in that partial 
failover.
+        *
+        * <p>In contrast to this method, the {@link #resetToCheckpoint(long, 
byte[])} method is called in
+        * the case of a global failover, which is the case when the 
coordinator (JobManager) is
+        * recovered.
+        */
+       void subtaskReset(int subtask, long checkpointId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index a74aa9f..efcc2fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -126,7 +126,9 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
         * All subtasks will also have been reset to the same checkpoint.
         *
         * <p>This method is called in the case of a <i>global failover</i> of 
the system, which means
-        * a failover of the coordinator (JobManager).
+        * a failover of the coordinator (JobManager). This method is not 
invoked on a <i>partial
+        * failover</i>; partial failovers call the {@link #subtaskReset(int, 
long)} method for the
+        * involved subtasks.
         *
         * <p>This method is expected to behave synchronously with respect to 
other method calls and calls
         * to {@code Context} methods. For example, Events being sent by the 
Coordinator after this method
@@ -158,15 +160,21 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
         * Called when one of the subtasks of the task running the coordinated 
operator goes
         * through a failover (failure / recovery cycle).
         *
-        * <p>This method is called in case of a <i>partial failover</i> 
meaning a failover handled
-        * by the scheduler's failover strategy (by default recovering a 
pipelined region).
+        * <p>This method is called every time there is a failover of a 
subtasks, regardless of
+        * whether there it is a partial failover or a global failover.
+        */
+       void subtaskFailed(int subtask, @Nullable Throwable reason);
+
+       /**
+        * Called if a task is recovered as part of a <i>partial failover</i>, 
meaning a failover
+        * handled by the scheduler's failover strategy (by default recovering 
a pipelined region).
         * The method is invoked for each subtask involved in that partial 
failover.
         *
         * <p>In contrast to this method, the {@link #resetToCheckpoint(long, 
byte[])} method is called in
         * the case of a global failover, which is the case when the 
coordinator (JobManager) is
         * recovered.
         */
-       void subtaskFailed(int subtask, @Nullable Throwable reason);
+       void subtaskReset(int subtask, long checkpointId);
 
        // 
------------------------------------------------------------------------
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 4f0caed..935e252 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -202,6 +202,12 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        }
 
        @Override
+       public void subtaskReset(int subtask, long checkpointId) {
+               mainThreadExecutor.assertRunningInMainThread();
+               coordinator.subtaskReset(subtask, checkpointId);
+       }
+
+       @Override
        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) {
                // unfortunately, this method does not run in the scheduler 
executor, but in the
                // checkpoint coordinator time thread.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 822ea5e..346a40f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -91,6 +91,13 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
+       public void subtaskReset(int subtask, long checkpointId) {
+               coordinator.applyCall(
+                       "subtaskReset",
+                       c -> c.subtaskReset(subtask, checkpointId));
+       }
+
+       @Override
        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) throws Exception {
                coordinator.applyCall(
                                "checkpointCoordinator",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1986136..4b442a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -100,6 +100,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -120,6 +121,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -347,7 +349,17 @@ public abstract class SchedulerBase implements SchedulerNG 
{
 
        protected void restoreState(final Set<ExecutionVertexID> vertices, 
final boolean isGlobalRecovery) throws Exception {
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+
                if (checkpointCoordinator == null) {
+                       // batch failover case - we only need to notify the 
OperatorCoordinators,
+                       // not do any actual state restore
+                       if (isGlobalRecovery) {
+                               notifyCoordinatorsOfEmptyGlobalRestore();
+                       } else {
+                               notifyCoordinatorsOfSubtaskRestore(
+                                       
getInvolvedExecutionJobVerticesAndSubtasks(vertices),
+                                       OperatorCoordinator.NO_CHECKPOINT);
+                       }
                        return;
                }
 
@@ -359,11 +371,60 @@ public abstract class SchedulerBase implements 
SchedulerNG {
                checkpointCoordinator.abortPendingCheckpoints(
                                new 
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
 
-               final Set<ExecutionJobVertex> jobVerticesToRestore = 
getInvolvedExecutionJobVertices(vertices);
                if (isGlobalRecovery) {
+                       final Set<ExecutionJobVertex> jobVerticesToRestore = 
getInvolvedExecutionJobVertices(vertices);
+
+                       // a global restore restores all Job Vertices
+                       assert jobVerticesToRestore.size() == 
getExecutionGraph().getAllVertices().size();
+
                        
checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, 
true);
+
                } else {
-                       
checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(jobVerticesToRestore);
+                       final Map<ExecutionJobVertex, IntArrayList> 
subtasksToRestore =
+                                       
getInvolvedExecutionJobVerticesAndSubtasks(vertices);
+
+                       final OptionalLong restoredCheckpointId =
+                                       
checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(subtasksToRestore.keySet());
+
+                       // Ideally, the Checkpoint Coordinator would call 
OperatorCoordinator.resetSubtask, but
+                       // the Checkpoint Coordinator is not aware of subtasks 
in a local failover. It always
+                       // assigns state to all subtasks, and for the subtask 
execution attempts that are still
+                       // running (or not waiting to be deployed) the state 
assignment has simply no effect.
+                       // Because of that, we need to do the "subtask 
restored" notification here.
+                       // Once the Checkpoint Coordinator is properly aware of 
partial (region) recovery,
+                       // this code should move into the Checkpoint 
Coordinator.
+                       final long checkpointId = 
restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT);
+                       notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, 
checkpointId);
+               }
+       }
+
+       private void notifyCoordinatorsOfSubtaskRestore(
+                       final Map<ExecutionJobVertex, IntArrayList> 
restoredSubtasks,
+                       final long checkpointId) {
+
+               for (final Map.Entry<ExecutionJobVertex, IntArrayList> 
vertexSubtasks : restoredSubtasks.entrySet()) {
+                       final ExecutionJobVertex jobVertex = 
vertexSubtasks.getKey();
+                       final IntArrayList subtasks = vertexSubtasks.getValue();
+
+                       final Collection<OperatorCoordinatorHolder> 
coordinators = jobVertex.getOperatorCoordinators();
+                       if (coordinators.isEmpty()) {
+                               continue;
+                       }
+
+                       while (!subtasks.isEmpty()) {
+                               final int subtask = subtasks.removeLast(); // 
this is how IntArrayList implements iterations
+                               for (final OperatorCoordinatorHolder 
opCoordinator : coordinators) {
+                                       opCoordinator.subtaskReset(subtask, 
checkpointId);
+                               }
+                       }
+               }
+       }
+
+       private void notifyCoordinatorsOfEmptyGlobalRestore() throws Exception {
+               for (final ExecutionJobVertex ejv : 
getExecutionGraph().getAllVertices().values()) {
+                       for (final OperatorCoordinator coordinator : 
ejv.getOperatorCoordinators()) {
+                               
coordinator.resetToCheckpoint(OperatorCoordinator.NO_CHECKPOINT, null);
+                       }
                }
        }
 
@@ -378,6 +439,22 @@ public abstract class SchedulerBase implements SchedulerNG 
{
                return tasks;
        }
 
+       private Map<ExecutionJobVertex, IntArrayList> 
getInvolvedExecutionJobVerticesAndSubtasks(
+                       final Set<ExecutionVertexID> executionVertices) {
+
+               final HashMap<ExecutionJobVertex, IntArrayList> result = new 
HashMap<>();
+
+               for (ExecutionVertexID executionVertexID : executionVertices) {
+                       final ExecutionVertex executionVertex = 
getExecutionVertex(executionVertexID);
+                       final IntArrayList subtasks = result.computeIfAbsent(
+                                       executionVertex.getJobVertex(),
+                                       (key) -> new IntArrayList(32));
+                       subtasks.add(executionVertex.getParallelSubtaskIndex());
+               }
+
+               return result;
+       }
+
        protected void transitionToScheduled(final List<ExecutionVertexID> 
verticesToDeploy) {
                verticesToDeploy.forEach(executionVertexId -> 
getExecutionVertex(executionVertexId)
                        .getCurrentExecutionAttempt()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 45cb4f8..31135ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -183,6 +183,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT> implements
        }
 
        @Override
+       public void subtaskReset(int subtask, long checkpointId) {
+               // TODO - move the split reset logic here
+       }
+
+       @Override
        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) {
                runInEventLoop(
                        () -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 627aa07..9299e3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -848,6 +848,9 @@ public class CheckpointCoordinatorTestingUtils {
                public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData) throws Exception {}
 
                @Override
+               public void subtaskReset(int subtask, long checkpointId) {}
+
+               @Override
                public OperatorID operatorId() {
                        return operatorID;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 378b73c..dff084a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -289,6 +289,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
                }
 
                @Override
+               public void subtaskReset(int subtask, long checkpointId) {}
+
+               @Override
                public void resetToCheckpoint(
                                final long checkpointId,
                                @Nullable final byte[] checkpointData) throws 
Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
index e86615a..6519444 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -49,6 +49,11 @@ public final class MockOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
+       public void subtaskReset(int subtask, long checkpointId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) {
                throw new UnsupportedOperationException();
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index ce135c6..ea9cb42 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -560,6 +560,9 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                public void subtaskFailed(int subtask, @Nullable Throwable 
reason) {}
 
                @Override
+               public void subtaskReset(int subtask, long checkpointId) {}
+
+               @Override
                public abstract void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) throws Exception;
 
                @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index aa1abea..e23fe19 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -72,6 +72,7 @@ import static 
org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
 import static 
org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertArrayEquals;
@@ -335,7 +336,45 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
        }
 
        @Test
-       public void testLocalFailureDoesNotResetToCheckpoint() throws Exception 
{
+       public void testGlobalFailoverDoesNotNotifyLocalRestore() throws 
Exception {
+               final DefaultScheduler scheduler = 
createSchedulerAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               takeCompleteCheckpoint(scheduler, coordinator, new byte[0]);
+               failGlobalAndRestart(scheduler, new TestException());
+
+               assertThat(coordinator.getRestoredTasks(), empty());
+       }
+
+       @Test
+       public void testLocalFailoverResetsTask() throws Exception {
+               final DefaultScheduler scheduler = 
createSchedulerAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               final long checkpointId = takeCompleteCheckpoint(scheduler, 
coordinator, new byte[0]);
+               failAndRestartTask(scheduler, 1);
+
+               assertEquals(1, coordinator.getRestoredTasks().size());
+               final TestingOperatorCoordinator.SubtaskAndCheckpoint 
restoredTask = coordinator.getRestoredTasks().get(0);
+               assertEquals(1, restoredTask.subtaskIndex);
+               assertEquals(checkpointId, restoredTask.checkpointId);
+       }
+
+       @Test
+       public void testLocalFailoverBeforeCheckpointResetsTask() throws 
Exception {
+               final DefaultScheduler scheduler = 
createSchedulerAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               failAndRestartTask(scheduler, 1);
+
+               assertEquals(1, coordinator.getRestoredTasks().size());
+               final TestingOperatorCoordinator.SubtaskAndCheckpoint 
restoredTask = coordinator.getRestoredTasks().get(0);
+               assertEquals(1, restoredTask.subtaskIndex);
+               assertEquals(OperatorCoordinator.NO_CHECKPOINT, 
restoredTask.checkpointId);
+       }
+
+       @Test
+       public void testLocalFailoverDoesNotResetToCheckpoint() throws 
Exception {
                final DefaultScheduler scheduler = 
createSchedulerAndDeployTasks();
                final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
 
@@ -357,6 +396,55 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
        }
 
        // 
------------------------------------------------------------------------
+       //  tests for failover notifications in a batch setup (no checkpoints)
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testBatchGlobalFailureResetsToEmptyState() throws Exception 
{
+               final DefaultScheduler scheduler = 
createSchedulerWithoutCheckpointingAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               failGlobalAndRestart(scheduler, new TestException());
+
+               assertSame("coordinator should have null restored state",
+                       TestingOperatorCoordinator.NULL_RESTORE_VALUE, 
coordinator.getLastRestoredCheckpointState());
+               assertEquals(OperatorCoordinator.NO_CHECKPOINT, 
coordinator.getLastRestoredCheckpointId());
+       }
+
+       @Test
+       public void testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws 
Exception {
+               final DefaultScheduler scheduler = 
createSchedulerWithoutCheckpointingAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               failGlobalAndRestart(scheduler, new TestException());
+
+               assertThat(coordinator.getRestoredTasks(), empty());
+       }
+
+       @Test
+       public void testBatchLocalFailoverResetsTask() throws Exception {
+               final DefaultScheduler scheduler = 
createSchedulerWithoutCheckpointingAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               failAndRestartTask(scheduler, 1);
+
+               assertEquals(1, coordinator.getRestoredTasks().size());
+               final TestingOperatorCoordinator.SubtaskAndCheckpoint 
restoredTask = coordinator.getRestoredTasks().get(0);
+               assertEquals(1, restoredTask.subtaskIndex);
+               assertEquals(OperatorCoordinator.NO_CHECKPOINT, 
restoredTask.checkpointId);
+       }
+
+       @Test
+       public void testBatchLocalFailoverDoesNotResetToCheckpoint() throws 
Exception {
+               final DefaultScheduler scheduler = 
createSchedulerWithoutCheckpointingAndDeployTasks();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               failAndRestartTask(scheduler, 0);
+
+               assertNull("coordinator should not have a restored checkpoint", 
coordinator.getLastRestoredCheckpointState());
+       }
+
+       // 
------------------------------------------------------------------------
        //  tests for REST request delivery
        // 
------------------------------------------------------------------------
 
@@ -435,6 +523,17 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                return scheduler;
        }
 
+       private DefaultScheduler 
createSchedulerWithoutCheckpointingAndDeployTasks() throws Exception {
+               final Consumer<JobGraph> noCheckpoints = (jobGraph) -> 
jobGraph.setSnapshotSettings(null);
+               final DefaultScheduler scheduler = setupTestJobAndScheduler(new 
TestingOperatorCoordinator.Provider(testOperatorId), null, noCheckpoints, 
false);
+
+               // guard test assumptions: this must set up a scheduler without 
checkpoints
+               
assertNull(scheduler.getExecutionGraph().getCheckpointCoordinator());
+
+               scheduleAllTasksToRunning(scheduler);
+               return scheduler;
+       }
+
        private DefaultScheduler 
createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws 
Exception {
                final DefaultScheduler scheduler = 
setupTestJobAndScheduler(provider);
                scheduleAllTasksToRunning(scheduler);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 3011f58..9fbd312 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -40,6 +40,7 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
        private final OperatorCoordinator.Context context;
 
        private final ArrayList<Integer> failedTasks = new ArrayList<>();
+       private final ArrayList<SubtaskAndCheckpoint> restoredTasks = new 
ArrayList<>();
 
        private final CountDownLatch blockOnCloseLatch;
 
@@ -96,6 +97,11 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
+       public void subtaskReset(int subtask, long checkpointId) {
+               restoredTasks.add(new SubtaskAndCheckpoint(subtask, 
checkpointId));
+       }
+
+       @Override
        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) {
                boolean added = triggeredCheckpoints.offer(result);
                assert added; // guard the test assumptions
@@ -132,6 +138,10 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
                return failedTasks;
        }
 
+       public List<SubtaskAndCheckpoint> getRestoredTasks() {
+               return restoredTasks;
+       }
+
        @Nullable
        public byte[] getLastRestoredCheckpointState() {
                return lastRestoredCheckpointState;
@@ -163,6 +173,19 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        // 
------------------------------------------------------------------------
+
+       public static final class SubtaskAndCheckpoint {
+
+               public final int subtaskIndex;
+               public final long checkpointId;
+
+               public SubtaskAndCheckpoint(int subtaskIndex, long 
checkpointId) {
+                       this.subtaskIndex = subtaskIndex;
+                       this.checkpointId = checkpointId;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
        //  The provider for this coordinator implementation
        // 
------------------------------------------------------------------------
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index bdce876..3c96c63 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -184,6 +184,11 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
        }
 
        @Override
+       public void subtaskReset(int subtask, long checkpointId) {
+               // nothing to do here, connections are re-created lazily
+       }
+
+       @Override
        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) throws Exception {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(baos);

Reply via email to