This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d53e5b3b133a7ffb8fae4534c1aa198508cb56c9
Author: Stephan Ewen <[email protected]>
AuthorDate: Fri Nov 27 16:38:24 2020 +0100

    [FLINK-20397][checkpointing] Pass checkpointId to 
OperatorCoordinator.resetToCheckpoint()
---
 .../runtime/checkpoint/CheckpointCoordinator.java   | 16 +++++++++++-----
 .../OperatorCoordinatorCheckpointContext.java       |  2 +-
 .../operators/coordination/OperatorCoordinator.java | 21 ++++++++++++++++-----
 .../coordination/OperatorCoordinatorHolder.java     |  4 ++--
 .../RecreateOnResetOperatorCoordinator.java         | 14 +++++++++-----
 .../source/coordinator/SourceCoordinator.java       |  5 ++++-
 .../CheckpointCoordinatorTestingUtils.java          |  4 +---
 .../checkpoint/CheckpointIDCounterTestBase.java     | 21 +++++++++++++++++++++
 .../CoordinatorEventsExactlyOnceITCase.java         |  4 +++-
 .../coordination/MockOperatorCoordinator.java       |  2 +-
 .../coordination/OperatorCoordinatorHolderTest.java |  8 ++++----
 .../OperatorCoordinatorSchedulerTest.java           |  1 +
 .../RecreateOnResetOperatorCoordinatorTest.java     |  8 ++++----
 .../coordination/TestingOperatorCoordinator.java    |  8 +++++++-
 .../coordinator/SourceCoordinatorProviderTest.java  |  2 +-
 .../source/coordinator/SourceCoordinatorTest.java   |  6 +++---
 .../collect/CollectSinkOperatorCoordinator.java     |  2 +-
 17 files changed, 90 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 566592c..9154989 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -1395,8 +1396,10 @@ public class CheckpointCoordinator {
                                        LOG.debug("Resetting the master 
hooks.");
                                        MasterHooks.reset(masterHooks.values(), 
LOG);
 
-                                       LOG.info("Resetting the Coordinators to 
an empty state.");
-                                       
restoreStateToCoordinators(Collections.emptyMap());
+                                       LOG.info("Resetting the Operator 
Coordinators to an empty state.");
+                                       restoreStateToCoordinators(
+                                                       
OperatorCoordinator.NO_CHECKPOINT,
+                                                       Collections.emptyMap());
                                }
 
                                return OptionalLong.empty();
@@ -1424,7 +1427,7 @@ public class CheckpointCoordinator {
                                        LOG);
 
                        if (operatorCoordinatorRestoreBehavior != 
OperatorCoordinatorRestoreBehavior.SKIP) {
-                               restoreStateToCoordinators(operatorStates);
+                               
restoreStateToCoordinators(latest.getCheckpointID(), operatorStates);
                        }
 
                        // update metrics
@@ -1641,12 +1644,15 @@ public class CheckpointCoordinator {
                        initDelay, baseInterval, TimeUnit.MILLISECONDS);
        }
 
-       private void restoreStateToCoordinators(final Map<OperatorID, 
OperatorState> operatorStates) throws Exception {
+       private void restoreStateToCoordinators(
+                       final long checkpointId,
+                       final Map<OperatorID, OperatorState> operatorStates) 
throws Exception {
+
                for (OperatorCoordinatorCheckpointContext coordContext : 
coordinatorsToCheckpoint) {
                        final OperatorState state = 
operatorStates.get(coordContext.operatorId());
                        final ByteStreamStateHandle coordinatorState = state == 
null ? null : state.getCoordinatorState();
                        final byte[] bytes = coordinatorState == null ? null : 
coordinatorState.getData();
-                       coordContext.resetToCheckpoint(bytes);
+                       coordContext.resetToCheckpoint(checkpointId, bytes);
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 8e639e9..8206d8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -65,5 +65,5 @@ public interface OperatorCoordinatorCheckpointContext extends 
OperatorInfo, Chec
         * </ul>
         * In both cases, the coordinator should reset to an empty (new) state.
         */
-       void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception;
+       void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index 2732bd5..a74aa9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -47,6 +47,15 @@ import java.util.concurrent.CompletableFuture;
 public interface OperatorCoordinator extends CheckpointListener, AutoCloseable 
{
 
        /**
+        * The checkpoint ID passed to the restore methods when no completed 
checkpoint exists, yet.
+        * It indicates that the restore is to the "initial state" of the 
coordinator or the
+        * failed subtask.
+        */
+       long NO_CHECKPOINT = -1L;
+
+       // 
------------------------------------------------------------------------
+
+       /**
         * Starts the coordinator. This method is called once at the beginning, 
before any other methods.
         *
         * @throws Exception Any exception thrown from this method causes a 
full job failure.
@@ -116,6 +125,9 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
         * When this method is called, the coordinator can discard all other 
in-flight working state.
         * All subtasks will also have been reset to the same checkpoint.
         *
+        * <p>This method is called in the case of a <i>global failover</i> of 
the system, which means
+        * a failover of the coordinator (JobManager).
+        *
         * <p>This method is expected to behave synchronously with respect to 
other method calls and calls
         * to {@code Context} methods. For example, Events being sent by the 
Coordinator after this method
         * returns are assumed to take place after the checkpoint that was 
restored.
@@ -138,7 +150,7 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
         * complete (for example when a system failure happened directly after 
committing the checkpoint,
         * before calling the {@link #notifyCheckpointComplete(long)} method).
         */
-       void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception;
+       void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) throws Exception;
 
        // 
------------------------------------------------------------------------
 
@@ -150,10 +162,9 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
         * by the scheduler's failover strategy (by default recovering a 
pipelined region).
         * The method is invoked for each subtask involved in that partial 
failover.
         *
-        * <p>In contrast to this method, the {@link 
#resetToCheckpoint(byte[])} method is called in
-        * the case of a global failover, which is the case when the 
coordinator (JobManager) fails
-        * or the scheduler invokes its safety net where the whole system is 
reset to the latest
-        * complete checkpoint.
+        * <p>In contrast to this method, the {@link #resetToCheckpoint(long, 
byte[])} method is called in
+        * the case of a global failover, which is the case when the 
coordinator (JobManager) is
+        * recovered.
         */
        void subtaskFailed(int subtask, @Nullable Throwable reason);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 4c7f252..4f0caed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -229,7 +229,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        }
 
        @Override
-       public void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception {
+       public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) throws Exception {
                // ideally we would like to check this here, however this 
method is called early during
                // execution graph construction, before the main thread 
executor is set
 
@@ -237,7 +237,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                if (context != null) {
                        context.resetFailed();
                }
-               coordinator.resetToCheckpoint(checkpointData);
+               coordinator.resetToCheckpoint(checkpointId, checkpointData);
        }
 
        private void checkpointCoordinatorInternal(final long checkpointId, 
final CompletableFuture<byte[]> result) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 52b30ed..822ea5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -105,7 +105,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void resetToCheckpoint(@Nullable byte[] checkpointData) {
+       public void resetToCheckpoint(final long checkpointId, @Nullable final 
byte[] checkpointData) {
                // First bump up the coordinator epoch to fence out the active 
coordinator.
                LOG.info("Resetting coordinator to checkpoint.");
                // Replace the coordinator variable with a new 
DeferrableCoordinator instance.
@@ -123,7 +123,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
                        if (!closed) {
                                // The previous coordinator has closed. Create 
a new one.
                                
newCoordinator.createNewInternalCoordinator(context, provider);
-                               newCoordinator.resetAndStart(checkpointData, 
started);
+                               newCoordinator.resetAndStart(checkpointId, 
checkpointData, started);
                                newCoordinator.processPendingCalls();
                        }
                });
@@ -248,7 +248,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        /**
-        * A class that helps realize the fully async {@link 
#resetToCheckpoint(byte[])} behavior.
+        * A class that helps realize the fully async {@link 
#resetToCheckpoint(long, byte[])} behavior.
         * The class wraps an {@link OperatorCoordinator} instance. It is going 
to be accessed
         * by two different thread: the scheduler thread and the closing thread 
created in
         * {@link #closeAsync(long)}. A DeferrableCoordinator could be in three 
states:
@@ -366,12 +366,16 @@ public class RecreateOnResetOperatorCoordinator 
implements OperatorCoordinator {
                        internalCoordinator.start();
                }
 
-               void resetAndStart(@Nullable byte[] checkpointData, boolean 
started) {
+               void resetAndStart(
+                               final long checkpointId,
+                               @Nullable final byte[] checkpointData,
+                               final boolean started) {
+
                        if (failed || closed || internalCoordinator == null) {
                                return;
                        }
                        try {
-                               
internalCoordinator.resetToCheckpoint(checkpointData);
+                               
internalCoordinator.resetToCheckpoint(checkpointId, checkpointData);
                                // Start the new coordinator if this 
coordinator has been started before reset to the checkpoint.
                                if (started) {
                                        internalCoordinator.start();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 3dea157..45cb4f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -223,7 +223,10 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT> implements
        }
 
        @Override
-       public void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception {
+       public void resetToCheckpoint(
+                       final long checkpointId,
+                       @Nullable final byte[] checkpointData) throws Exception 
{
+
                checkState(!started, "The coordinator can only be reset if it 
was not yet started");
                assert enumerator == null;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 1085558..627aa07 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -845,9 +845,7 @@ public class CheckpointCoordinatorTestingUtils {
                }
 
                @Override
-               public void resetToCheckpoint(byte[] checkpointData) throws 
Exception {
-
-               }
+               public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData) throws Exception {}
 
                @Override
                public OperatorID operatorId() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
index 3323e2c..4bd590b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -33,7 +34,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test base class with common tests for the {@link CheckpointIDCounter} 
implementations.
@@ -45,6 +48,24 @@ public abstract class CheckpointIDCounterTestBase extends 
TestLogger {
        // 
---------------------------------------------------------------------------------------------
 
        /**
+        * This test guards an assumption made in the notifications in the
+        * {@link 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator}.
+        * The coordinator is notified of a reset/restore and if no checkpoint 
yet exists (failure
+        * was before the first checkpoint), a negative ID is passed.
+        */
+       @Test
+       public void testCounterIsNeverNegative() throws Exception {
+               final CheckpointIDCounter counter = createCheckpointIdCounter();
+
+               try {
+                       counter.start();
+                       assertThat(counter.get(), greaterThanOrEqualTo(0L));
+               } finally {
+                       counter.shutdown(JobStatus.FINISHED);
+               }
+       }
+
+       /**
         * Tests serial increment and get calls.
         */
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 6ff3249..378b73c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -289,7 +289,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
                }
 
                @Override
-               public void resetToCheckpoint(byte[] checkpointData) throws 
Exception {
+               public void resetToCheckpoint(
+                               final long checkpointId,
+                               @Nullable final byte[] checkpointData) throws 
Exception {
                        executor.execute(() -> nextNumber = 
bytesToInt(checkpointData));
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
index 8c2c6ff..e86615a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -59,7 +59,7 @@ public final class MockOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void resetToCheckpoint(byte[] checkpointData) {
+       public void resetToCheckpoint(long checkpointId, byte[] checkpointData) 
{
                throw new UnsupportedOperationException();
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index 3f02bb7..ce135c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -172,7 +172,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
                triggerAndCompleteCheckpoint(holder, 1000L);
-               holder.resetToCheckpoint(new byte[0]);
+               holder.resetToCheckpoint(1L, new byte[0]);
                getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(999), 1);
 
                assertThat(sender.events, contains(
@@ -188,7 +188,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                triggerAndCompleteCheckpoint(holder, 1000L);
                getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(0), 0);
                getCoordinator(holder).getContext().sendEvent(new 
TestOperatorEvent(1), 1);
-               holder.resetToCheckpoint(new byte[0]);
+               holder.resetToCheckpoint(2L, new byte[0]);
 
                assertTrue(sender.events.isEmpty());
        }
@@ -302,7 +302,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                                                + "should only take the first 
request from the coordinator to fail the job.",
                                firstGlobalFailure, globalFailure);
 
-               holder.resetToCheckpoint(new byte[0]);
+               holder.resetToCheckpoint(0L, new byte[0]);
                holder.handleEventFromOperator(1, new TestOperatorEvent());
                assertNotEquals("The new failures should be propagated after 
the coordinator "
                                                        + "is reset.", 
firstGlobalFailure, globalFailure);
@@ -566,7 +566,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                public void notifyCheckpointComplete(long checkpointId) {}
 
                @Override
-               public void resetToCheckpoint(byte[] checkpointData) throws 
Exception {}
+               public void resetToCheckpoint(long checkpointId, byte[] 
checkpointData) throws Exception {}
 
                @Override
                public void run() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 6ba41cc..aa1abea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -331,6 +331,7 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
 
                assertSame("coordinator should have null restored state",
                        TestingOperatorCoordinator.NULL_RESTORE_VALUE, 
coordinator.getLastRestoredCheckpointState());
+               assertEquals(OperatorCoordinator.NO_CHECKPOINT, 
coordinator.getLastRestoredCheckpointId());
        }
 
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
index 3813059..d16892f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -84,7 +84,7 @@ public class RecreateOnResetOperatorCoordinatorTest {
                TestingOperatorCoordinator internalCoordinatorBeforeReset = 
getInternalCoordinator(coordinator);
 
                byte[] stateToRestore = new byte[0];
-               coordinator.resetToCheckpoint(stateToRestore);
+               coordinator.resetToCheckpoint(1L, stateToRestore);
 
                // Use the checkpoint to ensure all the previous method 
invocation has succeeded.
                coordinator.waitForAllAsyncCallsFinish();
@@ -105,7 +105,7 @@ public class RecreateOnResetOperatorCoordinatorTest {
                RecreateOnResetOperatorCoordinator coordinator =
                                (RecreateOnResetOperatorCoordinator) 
provider.create(context, closingTimeoutMs);
 
-               coordinator.resetToCheckpoint(new byte[0]);
+               coordinator.resetToCheckpoint(2L, new byte[0]);
                CommonTestUtils.waitUtil(
                        context::isJobFailed,
                        Duration.ofSeconds(5),
@@ -130,7 +130,7 @@ public class RecreateOnResetOperatorCoordinatorTest {
                // Reset the coordinator which closes the current internal 
coordinator
                // and then create a new one. The closing of the current 
internal
                // coordinator will block until the blockOnCloseLatch is pulled.
-               coordinator.resetToCheckpoint(restoredState);
+               coordinator.resetToCheckpoint(2L, restoredState);
 
                // The following method calls should be applied to the new 
internal
                // coordinator asynchronously because the current coordinator 
has not
@@ -189,7 +189,7 @@ public class RecreateOnResetOperatorCoordinatorTest {
                        future.thenRun(() -> 
coordinator.notifyCheckpointComplete(loop));
                        // The reset bytes has a length of i+1 here because 
this will be reset to the
                        // next internal coordinator.
-                       coordinator.resetToCheckpoint(new byte[i + 1]);
+                       coordinator.resetToCheckpoint(i, new byte[i + 1]);
                }
 
                coordinator.waitForAllAsyncCallsFinish();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 9572601..3011f58 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -45,6 +45,7 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
 
        @Nullable
        private byte[] lastRestoredCheckpointState;
+       private long lastRestoredCheckpointId;
 
        private BlockingQueue<CompletableFuture<byte[]>> triggeredCheckpoints;
 
@@ -106,7 +107,8 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void resetToCheckpoint(@Nullable byte[] checkpointData) {
+       public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) {
+               lastRestoredCheckpointId = checkpointId;
                lastRestoredCheckpointState = checkpointData == null
                                ? NULL_RESTORE_VALUE
                                : checkpointData;
@@ -135,6 +137,10 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
                return lastRestoredCheckpointState;
        }
 
+       public long getLastRestoredCheckpointId() {
+               return lastRestoredCheckpointId;
+       }
+
        public CompletableFuture<byte[]> getLastTriggeredCheckpoint() throws 
InterruptedException {
                return triggeredCheckpoints.take();
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index eae6d24..322f2f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -89,7 +89,7 @@ public class SourceCoordinatorProviderTest {
                }
 
                // reset the coordinator to the checkpoint which only contains 
reader 0.
-               coordinator.resetToCheckpoint(bytes);
+               coordinator.resetToCheckpoint(0L, bytes);
                final SourceCoordinator<?, ?> restoredSourceCoordinator =
                                (SourceCoordinator<?, ?>) 
coordinator.getInternalCoordinator();
                assertNotEquals("The restored source coordinator should be a 
different instance",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index bdb8c23..aa1f079 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -89,7 +89,7 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
        public void testRestCheckpointAfterCoordinatorStarted() throws 
Exception {
                // The following methods should only be invoked after the 
source coordinator has started.
                sourceCoordinator.start();
-               verifyException(() -> sourceCoordinator.resetToCheckpoint(null),
+               verifyException(() -> sourceCoordinator.resetToCheckpoint(0L, 
null),
                                "Reset to checkpoint should fail after the 
coordinator has started",
                                "The coordinator can only be reset if it was 
not yet started");
        }
@@ -146,7 +146,7 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
 
                // restore from the checkpoints.
                SourceCoordinator<?, ?> restoredCoordinator = 
getNewSourceCoordinator();
-               restoredCoordinator.resetToCheckpoint(bytes);
+               restoredCoordinator.resetToCheckpoint(100L, bytes);
                MockSplitEnumerator restoredEnumerator = (MockSplitEnumerator) 
restoredCoordinator.getEnumerator();
                SourceCoordinatorContext restoredContext = 
restoredCoordinator.getContext();
                assertEquals("2 splits should have been assigned to reader 0",
@@ -328,7 +328,7 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
                                "testOperator", context.getOperatorId(), 
source, 1);
 
                final OperatorCoordinator coordinator = 
provider.getCoordinator(context);
-               coordinator.resetToCheckpoint(createEmptyCheckpoint(1L));
+               coordinator.resetToCheckpoint(1L, createEmptyCheckpoint(1L));
                coordinator.start();
 
                final ClassLoaderTestEnumerator enumerator = 
source.restoreEnumeratorFuture.get();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index de8497c..bdce876 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -197,7 +197,7 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
        }
 
        @Override
-       public void resetToCheckpoint(@Nullable byte[] checkpointData) throws 
Exception {
+       public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) throws Exception {
                if (checkpointData == null) {
                        // restore before any checkpoint completed
                        closeConnection();

Reply via email to