This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d53e5b3b133a7ffb8fae4534c1aa198508cb56c9 Author: Stephan Ewen <[email protected]> AuthorDate: Fri Nov 27 16:38:24 2020 +0100 [FLINK-20397][checkpointing] Pass checkpointId to OperatorCoordinator.resetToCheckpoint() --- .../runtime/checkpoint/CheckpointCoordinator.java | 16 +++++++++++----- .../OperatorCoordinatorCheckpointContext.java | 2 +- .../operators/coordination/OperatorCoordinator.java | 21 ++++++++++++++++----- .../coordination/OperatorCoordinatorHolder.java | 4 ++-- .../RecreateOnResetOperatorCoordinator.java | 14 +++++++++----- .../source/coordinator/SourceCoordinator.java | 5 ++++- .../CheckpointCoordinatorTestingUtils.java | 4 +--- .../checkpoint/CheckpointIDCounterTestBase.java | 21 +++++++++++++++++++++ .../CoordinatorEventsExactlyOnceITCase.java | 4 +++- .../coordination/MockOperatorCoordinator.java | 2 +- .../coordination/OperatorCoordinatorHolderTest.java | 8 ++++---- .../OperatorCoordinatorSchedulerTest.java | 1 + .../RecreateOnResetOperatorCoordinatorTest.java | 8 ++++---- .../coordination/TestingOperatorCoordinator.java | 8 +++++++- .../coordinator/SourceCoordinatorProviderTest.java | 2 +- .../source/coordinator/SourceCoordinatorTest.java | 6 +++--- .../collect/CollectSinkOperatorCoordinator.java | 2 +- 17 files changed, 90 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 566592c..9154989 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.CheckpointStorageLocation; @@ -1395,8 +1396,10 @@ public class CheckpointCoordinator { LOG.debug("Resetting the master hooks."); MasterHooks.reset(masterHooks.values(), LOG); - LOG.info("Resetting the Coordinators to an empty state."); - restoreStateToCoordinators(Collections.emptyMap()); + LOG.info("Resetting the Operator Coordinators to an empty state."); + restoreStateToCoordinators( + OperatorCoordinator.NO_CHECKPOINT, + Collections.emptyMap()); } return OptionalLong.empty(); @@ -1424,7 +1427,7 @@ public class CheckpointCoordinator { LOG); if (operatorCoordinatorRestoreBehavior != OperatorCoordinatorRestoreBehavior.SKIP) { - restoreStateToCoordinators(operatorStates); + restoreStateToCoordinators(latest.getCheckpointID(), operatorStates); } // update metrics @@ -1641,12 +1644,15 @@ public class CheckpointCoordinator { initDelay, baseInterval, TimeUnit.MILLISECONDS); } - private void restoreStateToCoordinators(final Map<OperatorID, OperatorState> operatorStates) throws Exception { + private void restoreStateToCoordinators( + final long checkpointId, + final Map<OperatorID, OperatorState> operatorStates) throws Exception { + for (OperatorCoordinatorCheckpointContext coordContext : coordinatorsToCheckpoint) { final OperatorState state = operatorStates.get(coordContext.operatorId()); final ByteStreamStateHandle coordinatorState = state == null ? null : state.getCoordinatorState(); final byte[] bytes = coordinatorState == null ? null : coordinatorState.getData(); - coordContext.resetToCheckpoint(bytes); + coordContext.resetToCheckpoint(checkpointId, bytes); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java index 8e639e9..8206d8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java @@ -65,5 +65,5 @@ public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, Chec * </ul> * In both cases, the coordinator should reset to an empty (new) state. */ - void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception; + void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index 2732bd5..a74aa9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -47,6 +47,15 @@ import java.util.concurrent.CompletableFuture; public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { /** + * The checkpoint ID passed to the restore methods when no completed checkpoint exists, yet. + * It indicates that the restore is to the "initial state" of the coordinator or the + * failed subtask. + */ + long NO_CHECKPOINT = -1L; + + // ------------------------------------------------------------------------ + + /** * Starts the coordinator. This method is called once at the beginning, before any other methods. * * @throws Exception Any exception thrown from this method causes a full job failure. @@ -116,6 +125,9 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * When this method is called, the coordinator can discard all other in-flight working state. * All subtasks will also have been reset to the same checkpoint. * + * <p>This method is called in the case of a <i>global failover</i> of the system, which means + * a failover of the coordinator (JobManager). + * * <p>This method is expected to behave synchronously with respect to other method calls and calls * to {@code Context} methods. For example, Events being sent by the Coordinator after this method * returns are assumed to take place after the checkpoint that was restored. @@ -138,7 +150,7 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * complete (for example when a system failure happened directly after committing the checkpoint, * before calling the {@link #notifyCheckpointComplete(long)} method). */ - void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception; + void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception; // ------------------------------------------------------------------------ @@ -150,10 +162,9 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * by the scheduler's failover strategy (by default recovering a pipelined region). * The method is invoked for each subtask involved in that partial failover. * - * <p>In contrast to this method, the {@link #resetToCheckpoint(byte[])} method is called in - * the case of a global failover, which is the case when the coordinator (JobManager) fails - * or the scheduler invokes its safety net where the whole system is reset to the latest - * complete checkpoint. + * <p>In contrast to this method, the {@link #resetToCheckpoint(long, byte[])} method is called in + * the case of a global failover, which is the case when the coordinator (JobManager) is + * recovered. */ void subtaskFailed(int subtask, @Nullable Throwable reason); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 4c7f252..4f0caed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -229,7 +229,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC } @Override - public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception { // ideally we would like to check this here, however this method is called early during // execution graph construction, before the main thread executor is set @@ -237,7 +237,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC if (context != null) { context.resetFailed(); } - coordinator.resetToCheckpoint(checkpointData); + coordinator.resetToCheckpoint(checkpointId, checkpointData); } private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 52b30ed..822ea5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -105,7 +105,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { } @Override - public void resetToCheckpoint(@Nullable byte[] checkpointData) { + public void resetToCheckpoint(final long checkpointId, @Nullable final byte[] checkpointData) { // First bump up the coordinator epoch to fence out the active coordinator. LOG.info("Resetting coordinator to checkpoint."); // Replace the coordinator variable with a new DeferrableCoordinator instance. @@ -123,7 +123,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { if (!closed) { // The previous coordinator has closed. Create a new one. newCoordinator.createNewInternalCoordinator(context, provider); - newCoordinator.resetAndStart(checkpointData, started); + newCoordinator.resetAndStart(checkpointId, checkpointData, started); newCoordinator.processPendingCalls(); } }); @@ -248,7 +248,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { } /** - * A class that helps realize the fully async {@link #resetToCheckpoint(byte[])} behavior. + * A class that helps realize the fully async {@link #resetToCheckpoint(long, byte[])} behavior. * The class wraps an {@link OperatorCoordinator} instance. It is going to be accessed * by two different thread: the scheduler thread and the closing thread created in * {@link #closeAsync(long)}. A DeferrableCoordinator could be in three states: @@ -366,12 +366,16 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { internalCoordinator.start(); } - void resetAndStart(@Nullable byte[] checkpointData, boolean started) { + void resetAndStart( + final long checkpointId, + @Nullable final byte[] checkpointData, + final boolean started) { + if (failed || closed || internalCoordinator == null) { return; } try { - internalCoordinator.resetToCheckpoint(checkpointData); + internalCoordinator.resetToCheckpoint(checkpointId, checkpointData); // Start the new coordinator if this coordinator has been started before reset to the checkpoint. if (started) { internalCoordinator.start(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 3dea157..45cb4f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -223,7 +223,10 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } @Override - public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { + public void resetToCheckpoint( + final long checkpointId, + @Nullable final byte[] checkpointData) throws Exception { + checkState(!started, "The coordinator can only be reset if it was not yet started"); assert enumerator == null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 1085558..627aa07 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -845,9 +845,7 @@ public class CheckpointCoordinatorTestingUtils { } @Override - public void resetToCheckpoint(byte[] checkpointData) throws Exception { - - } + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {} @Override public OperatorID operatorId() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java index 3323e2c..4bd590b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobStatus; import org.apache.flink.util.TestLogger; +import org.hamcrest.Matchers; import org.junit.Test; import java.util.ArrayList; @@ -33,7 +34,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; /** * Test base class with common tests for the {@link CheckpointIDCounter} implementations. @@ -45,6 +48,24 @@ public abstract class CheckpointIDCounterTestBase extends TestLogger { // --------------------------------------------------------------------------------------------- /** + * This test guards an assumption made in the notifications in the + * {@link org.apache.flink.runtime.operators.coordination.OperatorCoordinator}. + * The coordinator is notified of a reset/restore and if no checkpoint yet exists (failure + * was before the first checkpoint), a negative ID is passed. + */ + @Test + public void testCounterIsNeverNegative() throws Exception { + final CheckpointIDCounter counter = createCheckpointIdCounter(); + + try { + counter.start(); + assertThat(counter.get(), greaterThanOrEqualTo(0L)); + } finally { + counter.shutdown(JobStatus.FINISHED); + } + } + + /** * Tests serial increment and get calls. */ @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index 6ff3249..378b73c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -289,7 +289,9 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { } @Override - public void resetToCheckpoint(byte[] checkpointData) throws Exception { + public void resetToCheckpoint( + final long checkpointId, + @Nullable final byte[] checkpointData) throws Exception { executor.execute(() -> nextNumber = bytesToInt(checkpointData)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java index 8c2c6ff..e86615a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java @@ -59,7 +59,7 @@ public final class MockOperatorCoordinator implements OperatorCoordinator { } @Override - public void resetToCheckpoint(byte[] checkpointData) { + public void resetToCheckpoint(long checkpointId, byte[] checkpointData) { throw new UnsupportedOperationException(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index 3f02bb7..ce135c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -172,7 +172,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new); triggerAndCompleteCheckpoint(holder, 1000L); - holder.resetToCheckpoint(new byte[0]); + holder.resetToCheckpoint(1L, new byte[0]); getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(999), 1); assertThat(sender.events, contains( @@ -188,7 +188,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { triggerAndCompleteCheckpoint(holder, 1000L); getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0); getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1); - holder.resetToCheckpoint(new byte[0]); + holder.resetToCheckpoint(2L, new byte[0]); assertTrue(sender.events.isEmpty()); } @@ -302,7 +302,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { + "should only take the first request from the coordinator to fail the job.", firstGlobalFailure, globalFailure); - holder.resetToCheckpoint(new byte[0]); + holder.resetToCheckpoint(0L, new byte[0]); holder.handleEventFromOperator(1, new TestOperatorEvent()); assertNotEquals("The new failures should be propagated after the coordinator " + "is reset.", firstGlobalFailure, globalFailure); @@ -566,7 +566,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { public void notifyCheckpointComplete(long checkpointId) {} @Override - public void resetToCheckpoint(byte[] checkpointData) throws Exception {} + public void resetToCheckpoint(long checkpointId, byte[] checkpointData) throws Exception {} @Override public void run() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index 6ba41cc..aa1abea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -331,6 +331,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { assertSame("coordinator should have null restored state", TestingOperatorCoordinator.NULL_RESTORE_VALUE, coordinator.getLastRestoredCheckpointState()); + assertEquals(OperatorCoordinator.NO_CHECKPOINT, coordinator.getLastRestoredCheckpointId()); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java index 3813059..d16892f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java @@ -84,7 +84,7 @@ public class RecreateOnResetOperatorCoordinatorTest { TestingOperatorCoordinator internalCoordinatorBeforeReset = getInternalCoordinator(coordinator); byte[] stateToRestore = new byte[0]; - coordinator.resetToCheckpoint(stateToRestore); + coordinator.resetToCheckpoint(1L, stateToRestore); // Use the checkpoint to ensure all the previous method invocation has succeeded. coordinator.waitForAllAsyncCallsFinish(); @@ -105,7 +105,7 @@ public class RecreateOnResetOperatorCoordinatorTest { RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator) provider.create(context, closingTimeoutMs); - coordinator.resetToCheckpoint(new byte[0]); + coordinator.resetToCheckpoint(2L, new byte[0]); CommonTestUtils.waitUtil( context::isJobFailed, Duration.ofSeconds(5), @@ -130,7 +130,7 @@ public class RecreateOnResetOperatorCoordinatorTest { // Reset the coordinator which closes the current internal coordinator // and then create a new one. The closing of the current internal // coordinator will block until the blockOnCloseLatch is pulled. - coordinator.resetToCheckpoint(restoredState); + coordinator.resetToCheckpoint(2L, restoredState); // The following method calls should be applied to the new internal // coordinator asynchronously because the current coordinator has not @@ -189,7 +189,7 @@ public class RecreateOnResetOperatorCoordinatorTest { future.thenRun(() -> coordinator.notifyCheckpointComplete(loop)); // The reset bytes has a length of i+1 here because this will be reset to the // next internal coordinator. - coordinator.resetToCheckpoint(new byte[i + 1]); + coordinator.resetToCheckpoint(i, new byte[i + 1]); } coordinator.waitForAllAsyncCallsFinish(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java index 9572601..3011f58 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java @@ -45,6 +45,7 @@ class TestingOperatorCoordinator implements OperatorCoordinator { @Nullable private byte[] lastRestoredCheckpointState; + private long lastRestoredCheckpointId; private BlockingQueue<CompletableFuture<byte[]>> triggeredCheckpoints; @@ -106,7 +107,8 @@ class TestingOperatorCoordinator implements OperatorCoordinator { } @Override - public void resetToCheckpoint(@Nullable byte[] checkpointData) { + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) { + lastRestoredCheckpointId = checkpointId; lastRestoredCheckpointState = checkpointData == null ? NULL_RESTORE_VALUE : checkpointData; @@ -135,6 +137,10 @@ class TestingOperatorCoordinator implements OperatorCoordinator { return lastRestoredCheckpointState; } + public long getLastRestoredCheckpointId() { + return lastRestoredCheckpointId; + } + public CompletableFuture<byte[]> getLastTriggeredCheckpoint() throws InterruptedException { return triggeredCheckpoints.take(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java index eae6d24..322f2f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java @@ -89,7 +89,7 @@ public class SourceCoordinatorProviderTest { } // reset the coordinator to the checkpoint which only contains reader 0. - coordinator.resetToCheckpoint(bytes); + coordinator.resetToCheckpoint(0L, bytes); final SourceCoordinator<?, ?> restoredSourceCoordinator = (SourceCoordinator<?, ?>) coordinator.getInternalCoordinator(); assertNotEquals("The restored source coordinator should be a different instance", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index bdb8c23..aa1f079 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -89,7 +89,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { public void testRestCheckpointAfterCoordinatorStarted() throws Exception { // The following methods should only be invoked after the source coordinator has started. sourceCoordinator.start(); - verifyException(() -> sourceCoordinator.resetToCheckpoint(null), + verifyException(() -> sourceCoordinator.resetToCheckpoint(0L, null), "Reset to checkpoint should fail after the coordinator has started", "The coordinator can only be reset if it was not yet started"); } @@ -146,7 +146,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { // restore from the checkpoints. SourceCoordinator<?, ?> restoredCoordinator = getNewSourceCoordinator(); - restoredCoordinator.resetToCheckpoint(bytes); + restoredCoordinator.resetToCheckpoint(100L, bytes); MockSplitEnumerator restoredEnumerator = (MockSplitEnumerator) restoredCoordinator.getEnumerator(); SourceCoordinatorContext restoredContext = restoredCoordinator.getContext(); assertEquals("2 splits should have been assigned to reader 0", @@ -328,7 +328,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { "testOperator", context.getOperatorId(), source, 1); final OperatorCoordinator coordinator = provider.getCoordinator(context); - coordinator.resetToCheckpoint(createEmptyCheckpoint(1L)); + coordinator.resetToCheckpoint(1L, createEmptyCheckpoint(1L)); coordinator.start(); final ClassLoaderTestEnumerator enumerator = source.restoreEnumeratorFuture.get(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index de8497c..bdce876 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -197,7 +197,7 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor } @Override - public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception { if (checkpointData == null) { // restore before any checkpoint completed closeConnection();
