[FLINK-3390] [runtime, tests] Restore savepoint path on ExecutionGraph restart

Temporary work around to restore initial state on failure during recovery as
required by a user. Will be superseded by FLINK-3397 with better handling of
checkpoint and savepoint restoring.

A failure during recovery resulted in restarting a job without its savepoint
state. This temporary work around makes sure that if the savepoint coordinator
ever restored a savepoint and there was no checkpoint after the savepoint,
the savepoint state will be restored again.

This closes #1720.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df19a8bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df19a8bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df19a8bf

Branch: refs/heads/release-1.0
Commit: df19a8bf908a21fc35830c08cc61d8d0566813eb
Parents: 016644a
Author: Ufuk Celebi <[email protected]>
Authored: Fri Feb 26 12:46:07 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Fri Feb 26 20:56:24 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   6 +-
 .../checkpoint/SavepointCoordinator.java        |  12 ++
 .../runtime/executiongraph/ExecutionGraph.java  |  12 +-
 .../test/checkpointing/SavepointITCase.java     | 169 ++++++++++++++++++-
 4 files changed, 187 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b0e23d6..edeab6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -741,7 +741,7 @@ public class CheckpointCoordinator {
        //  Checkpoint State Restoring
        // 
--------------------------------------------------------------------------------------------
 
-       public void restoreLatestCheckpointedState(
+       public boolean restoreLatestCheckpointedState(
                        Map<JobVertexID, ExecutionJobVertex> tasks,
                        boolean errorIfNoCheckpoint,
                        boolean allOrNothingState) throws Exception {
@@ -761,7 +761,7 @@ public class CheckpointCoordinator {
                                if (errorIfNoCheckpoint) {
                                        throw new IllegalStateException("No 
completed checkpoint available");
                                } else {
-                                       return;
+                                       return false;
                                }
                        }
 
@@ -799,6 +799,8 @@ public class CheckpointCoordinator {
                                        exec.setInitialState(state.getState(), 
recoveryTimestamp);
                                }
                        }
+
+                       return true;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index 5638e78..5008932 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -72,6 +72,10 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
        /** Mapping from checkpoint ID to promises for savepoints. */
        private final Map<Long, Promise<String>> savepointPromises;
 
+       // TODO(uce) Temporary work around to restore initial state on
+       // failure during recovery. Will be superseded by FLINK-3397.
+       private volatile String savepointRestorePath;
+
        public SavepointCoordinator(
                        JobID jobId,
                        long baseInterval,
@@ -102,6 +106,10 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
                this.savepointPromises = new ConcurrentHashMap<>();
        }
 
+       public String getSavepointRestorePath() {
+               return savepointRestorePath;
+       }
+
        // 
------------------------------------------------------------------------
        // Savepoint trigger and reset
        // 
------------------------------------------------------------------------
@@ -221,6 +229,10 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
                        checkpointIdCounter.start();
                        checkpointIdCounter.setCount(nextCheckpointId + 1);
                        LOG.info("Reset the checkpoint ID to {}", 
nextCheckpointId);
+
+                       if (savepointRestorePath == null) {
+                               savepointRestorePath = savepointPath;
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9a6eb85..0d6de98 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -872,7 +872,17 @@ public class ExecutionGraph implements Serializable {
 
                                // if we have checkpointed state, reload it 
into the executions
                                if (checkpointCoordinator != null) {
-                                       
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, 
false);
+                                       boolean restored = checkpointCoordinator
+                                                       
.restoreLatestCheckpointedState(getAllVertices(), false, false);
+
+                                       // TODO(uce) Temporary work around to 
restore initial state on
+                                       // failure during recovery. Will be 
superseded by FLINK-3397.
+                                       if (!restored && savepointCoordinator 
!= null) {
+                                               String savepointPath = 
savepointCoordinator.getSavepointRestorePath();
+                                               if (savepointPath != null) {
+                                                       
savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
+                                               }
+                                       }
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df19a8bf/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 49f4ee7..5386353 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -56,7 +56,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
@@ -71,6 +71,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -736,6 +737,113 @@ public class SavepointITCase extends TestLogger {
                }
        }
 
+       /**
+        * Tests that a restore failure is retried with the savepoint state.
+        */
+       @Test
+       public void testRestoreFailure() throws Exception {
+               // Config
+               int numTaskManagers = 1;
+               int numSlotsPerTaskManager = 1;
+               int numExecutionRetries = 2;
+               int retryDelay = 500;
+               int checkpointingInterval = 100000000;
+
+               // Test deadline
+               final Deadline deadline = new FiniteDuration(3, 
TimeUnit.MINUTES).fromNow();
+
+               ForkableFlinkMiniCluster flink = null;
+
+               try {
+                       // The job
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setParallelism(1);
+                       env.enableCheckpointing(checkpointingInterval);
+                       env.setNumberOfExecutionRetries(numExecutionRetries);
+                       env.getConfig().setExecutionRetryDelay(retryDelay);
+
+                       DataStream<Integer> stream = env
+                                       .addSource(new 
RestoreStateCountingAndFailingSource());
+
+                       // Source configuration
+                       
RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false;
+                       
RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0;
+                       
RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new 
CountDownLatch(1);
+                       RestoreStateCountingAndFailingSource.emitted= 0;
+
+                       stream.addSink(new DiscardingSink<Integer>());
+
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+                       // Flink configuration
+                       final Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
+                       LOG.info("Flink configuration: " + config + ".");
+
+                       // Start Flink
+                       flink = new ForkableFlinkMiniCluster(config);
+                       LOG.info("Starting Flink cluster.");
+                       flink.start();
+
+                       // Retrieve the job manager
+                       LOG.info("Retrieving JobManager.");
+                       ActorGateway jobManager = 
flink.getLeaderGateway(deadline.timeLeft());
+                       LOG.info("JobManager: " + jobManager + ".");
+
+                       // Submit the job and wait for some checkpoints to 
complete
+                       flink.submitJobDetached(jobGraph);
+
+                       while (deadline.hasTimeLeft() && 
RestoreStateCountingAndFailingSource.emitted < 100) {
+                               Thread.sleep(100);
+                       }
+
+                       assertTrue("No progress", 
RestoreStateCountingAndFailingSource.emitted >= 100);
+
+                       // Trigger the savepoint
+                       Future<Object> savepointPathFuture = jobManager.ask(
+                                       new 
TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft());
+
+                       final String savepointPath = ((TriggerSavepointSuccess) 
Await
+                                       .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
+                       LOG.info("Retrieved savepoint path: " + savepointPath + 
".");
+
+                       // Completed checkpoint
+                       
RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();
+
+                       // Cancel the job
+                       Future<?> cancelFuture = jobManager.ask(new CancelJob(
+                                       jobGraph.getJobID()), 
deadline.timeLeft());
+                       Await.ready(cancelFuture, deadline.timeLeft());
+
+                       // Wait for the job to be removed
+                       Future<?> removedFuture = jobManager.ask(new 
NotifyWhenJobRemoved(
+                                       jobGraph.getJobID()), 
deadline.timeLeft());
+                       Await.ready(removedFuture, deadline.timeLeft());
+
+                       // Set source to fail on restore calls and try to 
recover from savepoint
+                       
RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true;
+                       jobGraph.setSavepointPath(savepointPath);
+
+                       try {
+                               flink.submitJobAndWait(jobGraph, false, 
deadline.timeLeft());
+                               // If the savepoint state is not restored, we 
will wait here
+                               // until the deadline times out.
+                               fail("Did not throw expected Exception");
+                       } catch (Exception ignored) {
+                       } finally {
+                               // Expecting one restore for the initial 
submission from
+                               // savepoint and one for the execution retries
+                               assertEquals(1 + numExecutionRetries, 
RestoreStateCountingAndFailingSource.numRestoreStateCalls);
+                       }
+               }
+               finally {
+                       if (flink != null) {
+                               flink.shutdown();
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Test program
        // 
------------------------------------------------------------------------
@@ -761,13 +869,7 @@ public class SavepointITCase extends TestLogger {
                                .shuffle()
                                .map(new StatefulCounter());
 
-               // Discard
-               stream.addSink(new SinkFunction<Integer>() {
-                       private static final long serialVersionUID = 
-8671189807690005893L;
-                       @Override
-                       public void invoke(Integer value) throws Exception {
-                       }
-               });
+               stream.addSink(new DiscardingSink<Integer>());
 
                return env.getStreamGraph().getJobGraph();
        }
@@ -779,7 +881,7 @@ public class SavepointITCase extends TestLogger {
                private volatile boolean running = true;
 
                // Test control
-               private static CountDownLatch CheckpointCompleteLatch = new 
CountDownLatch(0);
+               private static CountDownLatch CheckpointCompleteLatch = new 
CountDownLatch(1);
 
                @Override
                public void run(SourceContext<Integer> ctx) throws Exception {
@@ -837,4 +939,53 @@ public class SavepointITCase extends TestLogger {
                }
        }
 
+       /**
+        * Test source that counts calls to restoreState and that can be 
configured
+        * to fail on restoreState calls.
+        */
+       private static class RestoreStateCountingAndFailingSource
+                       implements SourceFunction<Integer>, Checkpointed, 
CheckpointListener {
+
+               private static final long serialVersionUID = 1L;
+
+               private static volatile int numRestoreStateCalls = 0;
+               private static volatile boolean failOnRestoreStateCall = false;
+               private static volatile CountDownLatch checkpointCompleteLatch 
= new CountDownLatch(1);
+               private static volatile int emitted = 0;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       while (running) {
+                               ctx.collect(1);
+                               emitted++;
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+
+               @Override
+               public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return 1;
+               }
+
+               @Override
+               public void restoreState(Serializable state) throws Exception {
+                       numRestoreStateCalls++;
+
+                       if (failOnRestoreStateCall) {
+                               throw new RuntimeException("Restore test 
failure");
+                       }
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       checkpointCompleteLatch.countDown();
+               }
+       }
+
 }

Reply via email to