This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 03b2e704e3417cefadbc62a61a81efa685e24bea Author: Stephan Ewen <[email protected]> AuthorDate: Mon Nov 23 17:54:34 2020 +0100 [FLINK-20222][checkpointing] Operator Coordinators are reset with null state when no checkpoint or state available. This includes the following cases: - JobManager/Scheduler/Coordinator failover happens before the first checkpoint completed. - Coordinator has not stored any state in the checkpoint. For both cases, this is useful to signal to the coordinators that a reset-to-checkpoint happened, even if they do not have checkpointed state to restore. --- .../runtime/checkpoint/CheckpointCoordinator.java | 82 +++++++++++++++++----- .../OperatorCoordinatorCheckpointContext.java | 15 +++- .../coordination/OperatorCoordinator.java | 10 ++- .../coordination/OperatorCoordinatorHolder.java | 2 +- .../RecreateOnResetOperatorCoordinator.java | 4 +- .../flink/runtime/scheduler/SchedulerBase.java | 5 +- .../source/coordinator/SourceCoordinator.java | 9 ++- .../OperatorCoordinatorSchedulerTest.java | 12 ++++ .../coordination/TestingOperatorCoordinator.java | 8 ++- .../collect/CollectSinkOperatorCoordinator.java | 13 ++-- 10 files changed, 129 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0e61a11..61d6f22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1226,7 +1226,11 @@ public class CheckpointCoordinator { boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception { - return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState); + return restoreLatestCheckpointedStateInternal( + new HashSet<>(tasks.values()), + OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET, + errorIfNoCheckpoint, + allowNonRestoredState); } /** @@ -1257,7 +1261,11 @@ public class CheckpointCoordinator { // of the restarted region), meaning there will be unmatched state by design. // - because what we might end up restoring from an original savepoint with unmatched // state, if there is was no checkpoint yet. - return restoreLatestCheckpointedStateInternal(tasks, false, false, true); + return restoreLatestCheckpointedStateInternal( + tasks, + OperatorCoordinatorRestoreBehavior.SKIP, // local/regional recovery does not reset coordinators + false, // recovery might come before first successful checkpoint + true); // see explanation above } /** @@ -1287,12 +1295,34 @@ public class CheckpointCoordinator { final Set<ExecutionJobVertex> tasks, final boolean allowNonRestoredState) throws Exception { - return restoreLatestCheckpointedStateInternal(tasks, true, false, allowNonRestoredState); + return restoreLatestCheckpointedStateInternal( + tasks, + OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET, // global recovery restores coordinators, or resets them to empty + false, // recovery might come before first successful checkpoint + allowNonRestoredState); + } + + /** + * Restores the latest checkpointed at the beginning of the job execution. + * If there is a checkpoint, this method acts like a "global restore"-style + * operation where all stateful tasks and coordinators from the given + * set of Job Vertices are restored. + * + * @param tasks Set of job vertices to restore. State for these vertices is + * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. + * @return True, if a checkpoint was found and its state was restored, false otherwise. + */ + public boolean restoreInitialCheckpointIfPresent(final Set<ExecutionJobVertex> tasks) throws Exception { + return restoreLatestCheckpointedStateInternal( + tasks, + OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, + false, // initial checkpoints exist only on JobManager failover. ok if not present. + false); // JobManager failover means JobGraphs match exactly. } private boolean restoreLatestCheckpointedStateInternal( final Set<ExecutionJobVertex> tasks, - final boolean restoreCoordinators, + final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior, final boolean errorIfNoCheckpoint, final boolean allowNonRestoredState) throws Exception { @@ -1321,14 +1351,23 @@ public class CheckpointCoordinator { CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery); if (latest == null) { + LOG.info("No checkpoint found during restore."); + if (errorIfNoCheckpoint) { throw new IllegalStateException("No completed checkpoint available"); - } else { + } + + if (operatorCoordinatorRestoreBehavior == OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET) { + // we let the JobManager-side components know that there was a recovery, + // even if there was no checkpoint to recover from, yet LOG.debug("Resetting the master hooks."); MasterHooks.reset(masterHooks.values(), LOG); - return false; + LOG.info("Resetting the Coordinators to an empty state."); + restoreStateToCoordinators(Collections.emptyMap()); } + + return false; } LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest); @@ -1352,7 +1391,7 @@ public class CheckpointCoordinator { allowNonRestoredState, LOG); - if (restoreCoordinators) { + if (operatorCoordinatorRestoreBehavior != OperatorCoordinatorRestoreBehavior.SKIP) { restoreStateToCoordinators(operatorStates); } @@ -1410,7 +1449,11 @@ public class CheckpointCoordinator { LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId); - return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, true, allowNonRestored); + return restoreLatestCheckpointedStateInternal( + new HashSet<>(tasks.values()), + OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, + true, + allowNonRestored); } // ------------------------------------------------------------------------ @@ -1567,14 +1610,9 @@ public class CheckpointCoordinator { private void restoreStateToCoordinators(final Map<OperatorID, OperatorState> operatorStates) throws Exception { for (OperatorCoordinatorCheckpointContext coordContext : coordinatorsToCheckpoint) { final OperatorState state = operatorStates.get(coordContext.operatorId()); - if (state == null) { - continue; - } - - final ByteStreamStateHandle coordinatorState = state.getCoordinatorState(); - if (coordinatorState != null) { - coordContext.resetToCheckpoint(coordinatorState.getData()); - } + final ByteStreamStateHandle coordinatorState = state == null ? null : state.getCoordinatorState(); + final byte[] bytes = coordinatorState == null ? null : coordinatorState.getData(); + coordContext.resetToCheckpoint(bytes); } } @@ -1848,4 +1886,16 @@ public class CheckpointCoordinator { return props.forceCheckpoint(); } } + + private enum OperatorCoordinatorRestoreBehavior { + + /** Coordinators are always restored. If there is no checkpoint, they are restored empty. */ + RESTORE_OR_RESET, + + /** Coordinators are restored if there was a checkpoint. */ + RESTORE_IF_CHECKPOINT_PRESENT, + + /** Coordinators are not restored during this checkpoint restore. */ + SKIP; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java index f5f72d8..2f14e58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java @@ -22,6 +22,8 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.state.CheckpointListener; +import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; /** @@ -52,5 +54,16 @@ public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, Chec @Override default void notifyCheckpointAborted(long checkpointId) {} - void resetToCheckpoint(byte[] checkpointData) throws Exception; + /** + * Resets the coordinator to the checkpoint with the given state. + * + * <p>This method is called with a null state argument in the following situations: + * <ul> + * <li>There is a recovery and there was no completed checkpoint yet.</li> + * <li>There is a recovery from a completed checkpoint/savepoint but it contained no state + * for the coordinator.</li> + * </ul> + * In both cases, the coordinator should reset to an empty (new) state. + */ + void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index 21b67f2..1a6a640 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -127,6 +127,14 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * to {@code Context} methods. For example, Events being sent by the Coordinator after this method * returns are assumed to take place after the checkpoint that was restored. * + * <p>This method is called with a null state argument in the following situations: + * <ul> + * <li>There is a recovery and there was no completed checkpoint yet.</li> + * <li>There is a recovery from a completed checkpoint/savepoint but it contained no state + * for the coordinator.</li> + * </ul> + * In both cases, the coordinator should reset to an empty (new) state. + * * <h2>Restoring implicitly notifies of Checkpoint Completion</h2> * * <p>Restoring to a checkpoint is a way of confirming that the checkpoint is complete. @@ -137,7 +145,7 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * complete (for example when a system failure happened directly after committing the checkpoint, * before calling the {@link #notifyCheckpointComplete(long)} method). */ - void resetToCheckpoint(byte[] checkpointData) throws Exception; + void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception; // ------------------------------------------------------------------------ // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index e5844ef..4c7f252 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -229,7 +229,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC } @Override - public void resetToCheckpoint(byte[] checkpointData) throws Exception { + public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { // ideally we would like to check this here, however this method is called early during // execution graph construction, before the main thread executor is set diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index ae67868..52b30ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -105,7 +105,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { } @Override - public void resetToCheckpoint(byte[] checkpointData) throws Exception { + public void resetToCheckpoint(@Nullable byte[] checkpointData) { // First bump up the coordinator epoch to fence out the active coordinator. LOG.info("Resetting coordinator to checkpoint."); // Replace the coordinator variable with a new DeferrableCoordinator instance. @@ -366,7 +366,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { internalCoordinator.start(); } - void resetAndStart(byte[] checkpointData, boolean started) { + void resetAndStart(@Nullable byte[] checkpointData, boolean started) { if (failed || closed || internalCoordinator == null) { return; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 42d3e02..f65f3c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -245,9 +245,8 @@ public abstract class SchedulerBase implements SchedulerNG { if (checkpointCoordinator != null) { // check whether we find a valid checkpoint - if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll( - new HashSet<>(newExecutionGraph.getAllVertices().values()), - false)) { + if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( + new HashSet<>(newExecutionGraph.getAllVertices().values()))) { // check whether we can restore from a savepoint tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 07ebbdb..3dea157 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -223,10 +223,17 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } @Override - public void resetToCheckpoint(byte[] checkpointData) throws Exception { + public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { checkState(!started, "The coordinator can only be reset if it was not yet started"); assert enumerator == null; + // the checkpoint data is null if there was no completed checkpoint before + // in that case we don't restore here, but let a fresh SplitEnumerator be created + // when "start()" is called. + if (checkpointData == null) { + return; + } + LOG.info("Restoring SplitEnumerator of source {} from checkpoint.", operatorName); final ClassLoader userCodeClassLoader = context.getCoordinatorContext().getUserCodeClassloader(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index 353b6b9..024cd46 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -78,6 +78,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -322,6 +323,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { } @Test + public void testGlobalFailureBeforeCheckpointResetsToEmptyState() throws Exception { + final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + failGlobalAndRestart(scheduler, new TestException()); + + assertSame("coordinator should have null restored state", + TestingOperatorCoordinator.NULL_RESTORE_VALUE, coordinator.getLastRestoredCheckpointState()); + } + + @Test public void testLocalFailureDoesNotResetToCheckpoint() throws Exception { final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java index 5c5b91f..9572601 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java @@ -35,6 +35,8 @@ import java.util.concurrent.LinkedBlockingQueue; */ class TestingOperatorCoordinator implements OperatorCoordinator { + public static final byte[] NULL_RESTORE_VALUE = new byte[0]; + private final OperatorCoordinator.Context context; private final ArrayList<Integer> failedTasks = new ArrayList<>(); @@ -104,8 +106,10 @@ class TestingOperatorCoordinator implements OperatorCoordinator { } @Override - public void resetToCheckpoint(byte[] checkpointData) { - lastRestoredCheckpointState = checkpointData; + public void resetToCheckpoint(@Nullable byte[] checkpointData) { + lastRestoredCheckpointState = checkpointData == null + ? NULL_RESTORE_VALUE + : checkpointData; } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index 1423415..5454412 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -203,10 +203,15 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor } @Override - public void resetToCheckpoint(byte[] checkpointData) throws Exception { - ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData); - ObjectInputStream ois = new ObjectInputStream(bais); - address = (InetSocketAddress) ois.readObject(); + public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { + if (checkpointData == null) { + // restore before any checkpoint completed + closeConnection(); + } else { + ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData); + ObjectInputStream ois = new ObjectInputStream(bais); + address = (InetSocketAddress) ois.readObject(); + } } /**
