[FLINK-3390] [runtime, tests] Restore savepoint path on ExecutionGraph restart
Temporary work around to restore initial state on failure during recovery as required by a user. Will be superseded by FLINK-3397 with better handling of checkpoint and savepoint restoring. A failure during recovery resulted in restarting a job without its savepoint state. This temporary work around makes sure that if the savepoint coordinator ever restored a savepoint and there was no checkpoint after the savepoint, the savepoint state will be restored again. This closes #1720. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df19a8bf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df19a8bf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df19a8bf Branch: refs/heads/release-1.0 Commit: df19a8bf908a21fc35830c08cc61d8d0566813eb Parents: 016644a Author: Ufuk Celebi <[email protected]> Authored: Fri Feb 26 12:46:07 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Fri Feb 26 20:56:24 2016 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 6 +- .../checkpoint/SavepointCoordinator.java | 12 ++ .../runtime/executiongraph/ExecutionGraph.java | 12 +- .../test/checkpointing/SavepointITCase.java | 169 ++++++++++++++++++- 4 files changed, 187 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index b0e23d6..edeab6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -741,7 +741,7 @@ public class CheckpointCoordinator { // Checkpoint State Restoring // -------------------------------------------------------------------------------------------- - public void restoreLatestCheckpointedState( + public boolean restoreLatestCheckpointedState( Map<JobVertexID, ExecutionJobVertex> tasks, boolean errorIfNoCheckpoint, boolean allOrNothingState) throws Exception { @@ -761,7 +761,7 @@ public class CheckpointCoordinator { if (errorIfNoCheckpoint) { throw new IllegalStateException("No completed checkpoint available"); } else { - return; + return false; } } @@ -799,6 +799,8 @@ public class CheckpointCoordinator { exec.setInitialState(state.getState(), recoveryTimestamp); } } + + return true; } } http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java index 5638e78..5008932 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java @@ -72,6 +72,10 @@ public class SavepointCoordinator extends CheckpointCoordinator { /** Mapping from checkpoint ID to promises for savepoints. */ private final Map<Long, Promise<String>> savepointPromises; + // TODO(uce) Temporary work around to restore initial state on + // failure during recovery. Will be superseded by FLINK-3397. + private volatile String savepointRestorePath; + public SavepointCoordinator( JobID jobId, long baseInterval, @@ -102,6 +106,10 @@ public class SavepointCoordinator extends CheckpointCoordinator { this.savepointPromises = new ConcurrentHashMap<>(); } + public String getSavepointRestorePath() { + return savepointRestorePath; + } + // ------------------------------------------------------------------------ // Savepoint trigger and reset // ------------------------------------------------------------------------ @@ -221,6 +229,10 @@ public class SavepointCoordinator extends CheckpointCoordinator { checkpointIdCounter.start(); checkpointIdCounter.setCount(nextCheckpointId + 1); LOG.info("Reset the checkpoint ID to {}", nextCheckpointId); + + if (savepointRestorePath == null) { + savepointRestorePath = savepointPath; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 9a6eb85..0d6de98 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -872,7 +872,17 @@ public class ExecutionGraph implements Serializable { // if we have checkpointed state, reload it into the executions if (checkpointCoordinator != null) { - checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false); + boolean restored = checkpointCoordinator + .restoreLatestCheckpointedState(getAllVertices(), false, false); + + // TODO(uce) Temporary work around to restore initial state on + // failure during recovery. Will be superseded by FLINK-3397. + if (!restored && savepointCoordinator != null) { + String savepointPath = savepointCoordinator.getSavepointRestorePath(); + if (savepointPath != null) { + savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath); + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 49f4ee7..5386353 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -56,7 +56,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList; @@ -71,6 +71,7 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.File; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -736,6 +737,113 @@ public class SavepointITCase extends TestLogger { } } + /** + * Tests that a restore failure is retried with the savepoint state. + */ + @Test + public void testRestoreFailure() throws Exception { + // Config + int numTaskManagers = 1; + int numSlotsPerTaskManager = 1; + int numExecutionRetries = 2; + int retryDelay = 500; + int checkpointingInterval = 100000000; + + // Test deadline + final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); + + ForkableFlinkMiniCluster flink = null; + + try { + // The job + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(checkpointingInterval); + env.setNumberOfExecutionRetries(numExecutionRetries); + env.getConfig().setExecutionRetryDelay(retryDelay); + + DataStream<Integer> stream = env + .addSource(new RestoreStateCountingAndFailingSource()); + + // Source configuration + RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false; + RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0; + RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new CountDownLatch(1); + RestoreStateCountingAndFailingSource.emitted= 0; + + stream.addSink(new DiscardingSink<Integer>()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + // Flink configuration + final Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); + LOG.info("Flink configuration: " + config + "."); + + // Start Flink + flink = new ForkableFlinkMiniCluster(config); + LOG.info("Starting Flink cluster."); + flink.start(); + + // Retrieve the job manager + LOG.info("Retrieving JobManager."); + ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft()); + LOG.info("JobManager: " + jobManager + "."); + + // Submit the job and wait for some checkpoints to complete + flink.submitJobDetached(jobGraph); + + while (deadline.hasTimeLeft() && RestoreStateCountingAndFailingSource.emitted < 100) { + Thread.sleep(100); + } + + assertTrue("No progress", RestoreStateCountingAndFailingSource.emitted >= 100); + + // Trigger the savepoint + Future<Object> savepointPathFuture = jobManager.ask( + new TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft()); + + final String savepointPath = ((TriggerSavepointSuccess) Await + .result(savepointPathFuture, deadline.timeLeft())).savepointPath(); + LOG.info("Retrieved savepoint path: " + savepointPath + "."); + + // Completed checkpoint + RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await(); + + // Cancel the job + Future<?> cancelFuture = jobManager.ask(new CancelJob( + jobGraph.getJobID()), deadline.timeLeft()); + Await.ready(cancelFuture, deadline.timeLeft()); + + // Wait for the job to be removed + Future<?> removedFuture = jobManager.ask(new NotifyWhenJobRemoved( + jobGraph.getJobID()), deadline.timeLeft()); + Await.ready(removedFuture, deadline.timeLeft()); + + // Set source to fail on restore calls and try to recover from savepoint + RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true; + jobGraph.setSavepointPath(savepointPath); + + try { + flink.submitJobAndWait(jobGraph, false, deadline.timeLeft()); + // If the savepoint state is not restored, we will wait here + // until the deadline times out. + fail("Did not throw expected Exception"); + } catch (Exception ignored) { + } finally { + // Expecting one restore for the initial submission from + // savepoint and one for the execution retries + assertEquals(1 + numExecutionRetries, RestoreStateCountingAndFailingSource.numRestoreStateCalls); + } + } + finally { + if (flink != null) { + flink.shutdown(); + } + } + } + // ------------------------------------------------------------------------ // Test program // ------------------------------------------------------------------------ @@ -761,13 +869,7 @@ public class SavepointITCase extends TestLogger { .shuffle() .map(new StatefulCounter()); - // Discard - stream.addSink(new SinkFunction<Integer>() { - private static final long serialVersionUID = -8671189807690005893L; - @Override - public void invoke(Integer value) throws Exception { - } - }); + stream.addSink(new DiscardingSink<Integer>()); return env.getStreamGraph().getJobGraph(); } @@ -779,7 +881,7 @@ public class SavepointITCase extends TestLogger { private volatile boolean running = true; // Test control - private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(0); + private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(1); @Override public void run(SourceContext<Integer> ctx) throws Exception { @@ -837,4 +939,53 @@ public class SavepointITCase extends TestLogger { } } + /** + * Test source that counts calls to restoreState and that can be configured + * to fail on restoreState calls. + */ + private static class RestoreStateCountingAndFailingSource + implements SourceFunction<Integer>, Checkpointed, CheckpointListener { + + private static final long serialVersionUID = 1L; + + private static volatile int numRestoreStateCalls = 0; + private static volatile boolean failOnRestoreStateCall = false; + private static volatile CountDownLatch checkpointCompleteLatch = new CountDownLatch(1); + private static volatile int emitted = 0; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + while (running) { + ctx.collect(1); + emitted++; + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return 1; + } + + @Override + public void restoreState(Serializable state) throws Exception { + numRestoreStateCalls++; + + if (failOnRestoreStateCall) { + throw new RuntimeException("Restore test failure"); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + checkpointCompleteLatch.countDown(); + } + } + }
