Repository: flink
Updated Branches:
  refs/heads/master e288617f9 -> 4dee8feaf


[FLINK-5248] [tests] Catch restore failures in SavepointITCase

- Minor test clean up
- The test did not catch a task restore failure since only the
  TDDs were tested. Now, we test that restore is actually called
  and some checkpoints complete after restoring from a savepoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4dee8fea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4dee8fea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4dee8fea

Branch: refs/heads/master
Commit: 4dee8feaf7638bb043d7f3334cc8974456893a3d
Parents: e288617
Author: Ufuk Celebi <[email protected]>
Authored: Mon Dec 5 11:22:19 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Dec 5 11:53:56 2016 +0100

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 247 +++++++++----------
 1 file changed, 119 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4dee8fea/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index ab9c1fa..d52f115 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -62,7 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -76,14 +75,13 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
@@ -101,10 +99,7 @@ public class SavepointITCase extends TestLogger {
        private static final Logger LOG = 
LoggerFactory.getLogger(SavepointITCase.class);
 
        @Rule
-       public RetryRule retryRule = new RetryRule();
-
-       @Rule
-       public TemporaryFolder folder= new TemporaryFolder();
+       public TemporaryFolder folder = new TemporaryFolder();
 
        /**
         * Tests that it is possible to submit a job, trigger a savepoint, and
@@ -114,8 +109,9 @@ public class SavepointITCase extends TestLogger {
         * <ol>
         * <li>Submit job, wait for some checkpoints to complete</li>
         * <li>Trigger savepoint and verify that savepoint has been created</li>
-        * <li>Shut down the cluster, re-submit the job from the savepoint, and
-        * verify that the initial state has been reset</li>
+        * <li>Shut down the cluster, re-submit the job from the savepoint,
+        * verify that the initial state has been reset, and
+        * all tasks are running again</li>
         * <li>Cancel job, dispose the savepoint, and verify that everything
         * has been cleaned up</li>
         * </ol>
@@ -131,10 +127,11 @@ public class SavepointITCase extends TestLogger {
                final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
 
                // The number of checkpoints to complete before triggering the 
savepoint
-               final int numberOfCompletedCheckpoints = 10;
+               final int numberOfCompletedCheckpoints = 2;
+               final int checkpointingInterval = 100;
 
                // Temporary directory for file state backend
-               final File tmpDir = CommonTestUtils.createTempDirectory();
+               final File tmpDir = folder.newFolder();
 
                LOG.info("Created temporary directory: " + tmpDir + ".");
 
@@ -161,55 +158,50 @@ public class SavepointITCase extends TestLogger {
 
                        config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
                        
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-                                       checkpointDir.toURI().toString());
+                               checkpointDir.toURI().toString());
                        
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
                        
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-                                       savepointDir.toURI().toString());
+                               savepointDir.toURI().toString());
 
                        LOG.info("Flink configuration: " + config + ".");
 
                        // Start Flink
                        flink = new TestingCluster(config);
-                       LOG.info("Starting Flink cluster.");
                        flink.start();
 
                        // Retrieve the job manager
-                       LOG.info("Retrieving JobManager.");
                        ActorGateway jobManager = Await.result(
-                                       flink.leaderGateway().future(),
-                                       deadline.timeLeft());
-                       LOG.info("JobManager: " + jobManager + ".");
+                               flink.leaderGateway().future(),
+                               deadline.timeLeft());
 
                        // Submit the job
-                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000, 1000);
+                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000, checkpointingInterval);
                        final JobID jobId = jobGraph.getJobID();
 
                        // Wait for the source to be notified about the 
expected number
                        // of completed checkpoints
-                       InfiniteTestSource.CheckpointCompleteLatch = new 
CountDownLatch(
-                                       numberOfCompletedCheckpoints);
+                       StatefulCounter.resetForTest();
 
                        LOG.info("Submitting job " + jobGraph.getJobID() + " in 
detached mode.");
 
                        flink.submitJobDetached(jobGraph);
 
-                       LOG.info("Waiting for " + numberOfCompletedCheckpoints +
-                                       " checkpoint complete notifications.");
+                       LOG.info("Waiting for " + numberOfCompletedCheckpoints 
+ " checkpoint complete notifications.");
 
                        // Wait...
-                       InfiniteTestSource.CheckpointCompleteLatch.await();
+                       StatefulCounter.awaitCompletedCheckpoints(parallelism, 
numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
 
                        LOG.info("Received all " + numberOfCompletedCheckpoints 
+
-                                       " checkpoint complete notifications.");
+                               " checkpoint complete notifications.");
 
                        // ...and then trigger the savepoint
                        LOG.info("Triggering a savepoint.");
 
                        Future<Object> savepointPathFuture = jobManager.ask(
-                                       new TriggerSavepoint(jobId, 
Option.<String>empty()), deadline.timeLeft());
+                               new TriggerSavepoint(jobId, 
Option.<String>empty()), deadline.timeLeft());
 
                        final String savepointPath = ((TriggerSavepointSuccess) 
Await
-                                       .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
+                               .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
                        LOG.info("Retrieved savepoint path: " + savepointPath + 
".");
 
                        // Only one savepoint should exist
@@ -222,12 +214,10 @@ public class SavepointITCase extends TestLogger {
 
                        // Retrieve the savepoint from the testing job manager
                        LOG.info("Requesting the savepoint.");
-                       Future<Object> savepointFuture = jobManager.ask(
-                                       new RequestSavepoint(savepointPath),
-                                       deadline.timeLeft());
+                       Future<Object> savepointFuture = jobManager.ask(new 
RequestSavepoint(savepointPath), deadline.timeLeft());
 
                        SavepointV1 savepoint = (SavepointV1) 
((ResponseSavepoint) Await.result(
-                                       savepointFuture, 
deadline.timeLeft())).savepoint();
+                               savepointFuture, 
deadline.timeLeft())).savepoint();
                        LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
                        // Shut down the Flink cluster (thereby canceling the 
job)
@@ -238,24 +228,16 @@ public class SavepointITCase extends TestLogger {
                        // - Verification START 
-------------------------------------------
 
                        // Only one checkpoint of the savepoint should exist
-                       String errMsg = "Checkpoints directory not cleaned up 
properly.";
+                       // We currently have the following directory layout: 
checkpointDir/jobId/chk-ID
                        files = checkpointDir.listFiles();
-                       if (files != null) {
-                               assertEquals(errMsg, 1, files.length);
-                       }
-                       else {
-                               fail(errMsg);
-                       }
+                       assertNotNull("Checkpoint directory empty", files);
+                       assertEquals("Checkpoints directory cleaned up, but 
needed for savepoint.", 1, files.length);
+                       assertEquals("No job-specific base directory", 
jobGraph.getJobID().toString(), files[0].getName());
 
                        // Only one savepoint should exist
-                       errMsg = "Savepoints directory cleaned up.";
                        files = savepointDir.listFiles();
-                       if (files != null) {
-                               assertEquals(errMsg, 1, files.length);
-                       }
-                       else {
-                               fail(errMsg);
-                       }
+                       assertNotNull("Savepoint directory empty", files);
+                       assertEquals("No savepoint found in savepoint 
directory", 1, files.length);
 
                        // - Verification END 
---------------------------------------------
 
@@ -265,11 +247,13 @@ public class SavepointITCase extends TestLogger {
 
                        // Retrieve the job manager
                        LOG.info("Retrieving JobManager.");
-                       jobManager = Await.result(
-                                       flink.leaderGateway().future(),
-                                       deadline.timeLeft());
+                       jobManager = 
Await.result(flink.leaderGateway().future(), deadline.timeLeft());
                        LOG.info("JobManager: " + jobManager + ".");
 
+                       // Reset for restore
+                       StatefulCounter.resetForTest();
+
+                       // Gather all task deployment descriptors
                        final Throwable[] error = new Throwable[1];
                        final TestingCluster finalFlink = flink;
                        final Multimap<JobVertexID, TaskDeploymentDescriptor> 
tdds = HashMultimap.create();
@@ -282,15 +266,16 @@ public class SavepointITCase extends TestLogger {
                                                        // Register to all 
submit task messages for job
                                                        for (ActorRef 
taskManager : finalFlink.getTaskManagersAsJava()) {
                                                                
taskManager.tell(new TestingTaskManagerMessages
-                                                                               
.RegisterSubmitTaskListener(jobId), getTestActor());
+                                                                       
.RegisterSubmitTaskListener(jobId), getTestActor());
                                                        }
 
                                                        // Set the savepoint 
path
                                                        
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
                                                        LOG.info("Resubmitting 
job " + jobGraph.getJobID() + " with " +
-                                                                       
"savepoint path " + savepointPath + " in detached mode.");
+                                                               "savepoint path 
" + savepointPath + " in detached mode.");
 
+                                                       // Submit the job
                                                        
finalFlink.submitJobDetached(jobGraph);
 
                                                        int numTasks = 0;
@@ -300,12 +285,12 @@ public class SavepointITCase extends TestLogger {
 
                                                        // Gather the task 
deployment descriptors
                                                        LOG.info("Gathering " + 
numTasks + " submitted " +
-                                                                       
"TaskDeploymentDescriptor instances.");
+                                                               
"TaskDeploymentDescriptor instances.");
 
                                                        for (int i = 0; i < 
numTasks; i++) {
                                                                
ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener)
-                                                                               
expectMsgAnyClassOf(getRemainingTime(),
-                                                                               
                ResponseSubmitTaskListener.class);
+                                                                       
expectMsgAnyClassOf(getRemainingTime(),
+                                                                               
ResponseSubmitTaskListener.class);
 
                                                                
TaskDeploymentDescriptor tdd = resp.tdd();
 
@@ -317,8 +302,7 @@ public class SavepointITCase extends TestLogger {
 
                                                                
tdds.put(taskInformation.getJobVertexId(), tdd);
                                                        }
-                                               }
-                                               catch (Throwable t) {
+                                               } catch (Throwable t) {
                                                        error[0] = t;
                                                }
                                        }
@@ -327,7 +311,7 @@ public class SavepointITCase extends TestLogger {
 
                        // - Verification START 
-------------------------------------------
 
-                       errMsg = "Error during gathering of 
TaskDeploymentDescriptors";
+                       String errMsg = "Error during gathering of 
TaskDeploymentDescriptors";
                        assertNull(errMsg, error[0]);
 
                        // Verify that all tasks, which are part of the 
savepoint
@@ -336,7 +320,7 @@ public class SavepointITCase extends TestLogger {
                                Collection<TaskDeploymentDescriptor> taskTdds = 
tdds.get(taskState.getJobVertexID());
 
                                errMsg = "Missing task for savepoint state for 
operator "
-                                               + taskState.getJobVertexID() + 
".";
+                                       + taskState.getJobVertexID() + ".";
                                assertTrue(errMsg, taskTdds.size() > 0);
 
                                
assertEquals(taskState.getNumberCollectedStates(), taskTdds.size());
@@ -348,24 +332,27 @@ public class SavepointITCase extends TestLogger {
 
                                        errMsg = "Initial operator state 
mismatch.";
                                        assertEquals(errMsg, 
subtaskState.getLegacyOperatorState(),
-                                                       
tdd.getTaskStateHandles().getLegacyOperatorState());
+                                               
tdd.getTaskStateHandles().getLegacyOperatorState());
                                }
                        }
 
+                       // Await state is restored
+                       
StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis());
+
+                       // Await some progress after restore
+                       StatefulCounter.awaitCompletedCheckpoints(parallelism, 
numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+
                        // - Verification END 
---------------------------------------------
 
                        LOG.info("Cancelling job " + jobId + ".");
                        jobManager.tell(new CancelJob(jobId));
 
                        LOG.info("Disposing savepoint " + savepointPath + ".");
-                       Future<Object> disposeFuture = jobManager.ask(
-                                       new DisposeSavepoint(savepointPath),
-                                       deadline.timeLeft());
+                       Future<Object> disposeFuture = jobManager.ask(new 
DisposeSavepoint(savepointPath), deadline.timeLeft());
 
                        errMsg = "Failed to dispose savepoint " + savepointPath 
+ ".";
                        Object resp = Await.result(disposeFuture, 
deadline.timeLeft());
-                       assertTrue(errMsg, resp.getClass() ==
-                                       
getDisposeSavepointSuccess().getClass());
+                       assertTrue(errMsg, resp.getClass() == 
getDisposeSavepointSuccess().getClass());
 
                        // - Verification START 
-------------------------------------------
 
@@ -399,12 +386,11 @@ public class SavepointITCase extends TestLogger {
 
                        // All savepoints should have been cleaned up
                        errMsg = "Savepoints directory not cleaned up properly: 
" +
-                                       
Arrays.toString(savepointDir.listFiles()) + ".";
+                               Arrays.toString(savepointDir.listFiles()) + ".";
                        assertEquals(errMsg, 0, 
savepointDir.listFiles().length);
 
                        // - Verification END 
---------------------------------------------
-               }
-               finally {
+               } finally {
                        if (flink != null) {
                                flink.shutdown();
                        }
@@ -436,7 +422,7 @@ public class SavepointITCase extends TestLogger {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
                        
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-                                       savepointDir.toURI().toString());
+                               savepointDir.toURI().toString());
 
                        LOG.info("Flink configuration: " + config + ".");
 
@@ -448,8 +434,8 @@ public class SavepointITCase extends TestLogger {
                        // Retrieve the job manager
                        LOG.info("Retrieving JobManager.");
                        ActorGateway jobManager = Await.result(
-                                       flink.leaderGateway().future(),
-                                       deadline.timeLeft());
+                               flink.leaderGateway().future(),
+                               deadline.timeLeft());
                        LOG.info("JobManager: " + jobManager + ".");
 
                        // High value to ensure timeouts if restarted.
@@ -467,13 +453,11 @@ public class SavepointITCase extends TestLogger {
 
                        try {
                                flink.submitJobAndWait(jobGraph, false);
-                       }
-                       catch (Exception e) {
+                       } catch (Exception e) {
                                assertEquals(JobExecutionException.class, 
e.getClass());
                                assertEquals(IllegalArgumentException.class, 
e.getCause().getClass());
                        }
-               }
-               finally {
+               } finally {
                        if (flink != null) {
                                flink.shutdown();
                        }
@@ -488,10 +472,10 @@ public class SavepointITCase extends TestLogger {
         * Creates a streaming JobGraph from the StreamEnvironment.
         */
        private JobGraph createJobGraph(
-                       int parallelism,
-                       int numberOfRetries,
-                       long restartDelay,
-                       int checkpointingInterval) {
+               int parallelism,
+               int numberOfRetries,
+               long restartDelay,
+               int checkpointingInterval) {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
@@ -501,24 +485,20 @@ public class SavepointITCase extends TestLogger {
                env.getConfig().disableSysoutLogging();
 
                DataStream<Integer> stream = env
-                               .addSource(new InfiniteTestSource())
-                               .shuffle()
-                               .map(new StatefulCounter());
+                       .addSource(new InfiniteTestSource())
+                       .shuffle()
+                       .map(new StatefulCounter());
 
                stream.addSink(new DiscardingSink<Integer>());
 
                return env.getStreamGraph().getJobGraph();
        }
 
-       private static class InfiniteTestSource
-                       implements SourceFunction<Integer>, CheckpointListener {
+       private static class InfiniteTestSource implements 
SourceFunction<Integer> {
 
                private static final long serialVersionUID = 1L;
                private volatile boolean running = true;
 
-               // Test control
-               private static CountDownLatch CheckpointCompleteLatch = new 
CountDownLatch(1);
-
                @Override
                public void run(SourceContext<Integer> ctx) throws Exception {
                        while (running) {
@@ -530,16 +510,16 @@ public class SavepointITCase extends TestLogger {
                public void cancel() {
                        running = false;
                }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       CheckpointCompleteLatch.countDown();
-               }
        }
 
        private static class StatefulCounter
-                       extends RichMapFunction<Integer, Integer>
-                       implements Checkpointed<byte[]> {
+               extends RichMapFunction<Integer, Integer>
+               implements Checkpointed<byte[]>, CheckpointListener {
+
+               private static final Object checkpointLock = new Object();
+               private static int numCompleteCalls;
+               private static int numRestoreCalls;
+               private static boolean restoredFromCheckpoint;
 
                private static final long serialVersionUID = 
7317800376639115920L;
                private byte[] data;
@@ -570,55 +550,66 @@ public class SavepointITCase extends TestLogger {
                @Override
                public void restoreState(byte[] data) throws Exception {
                        this.data = data;
+
+                       synchronized (checkpointLock) {
+                               if (++numRestoreCalls == 
getRuntimeContext().getNumberOfParallelSubtasks()) {
+                                       restoredFromCheckpoint = true;
+                                       checkpointLock.notifyAll();
+                               }
+                       }
                }
-       }
 
-       /**
-        * Test source that counts calls to restoreState and that can be 
configured
-        * to fail on restoreState calls.
-        */
-       private static class RestoreStateCountingAndFailingSource
-                       implements SourceFunction<Integer>, Checkpointed, 
CheckpointListener {
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       synchronized (checkpointLock) {
+                               numCompleteCalls++;
+                               checkpointLock.notifyAll();
+                       }
+               }
 
-               private static final long serialVersionUID = 1L;
+               // 
--------------------------------------------------------------------
 
-               private static volatile int numRestoreStateCalls = 0;
-               private static volatile boolean failOnRestoreStateCall = false;
-               private static volatile CountDownLatch checkpointCompleteLatch 
= new CountDownLatch(1);
-               private static volatile int emitted = 0;
+               static void resetForTest() {
+                       synchronized (checkpointLock) {
+                               numCompleteCalls = 0;
+                               numRestoreCalls = 0;
+                               restoredFromCheckpoint = false;
+                       }
+               }
 
-               private volatile boolean running = true;
+               static void awaitCompletedCheckpoints(
+                               int parallelism,
+                               int expectedNumberOfCompletedCheckpoints,
+                               long timeoutMillis) throws 
InterruptedException, TimeoutException {
 
-               @Override
-               public void run(SourceContext<Integer> ctx) throws Exception {
-                       while (running) {
-                               ctx.collect(1);
-                               emitted++;
+                       long deadline = System.nanoTime() + timeoutMillis * 
1_000_000;
 
-                               if (failOnRestoreStateCall) {
-                                       throw new RuntimeException("Restore 
test failure");
+                       synchronized (checkpointLock) {
+                               // One completion notification per parallel 
subtask
+                               int expectedNumber = parallelism * 
expectedNumberOfCompletedCheckpoints;
+                               while (numCompleteCalls < expectedNumber && 
System.nanoTime() <= deadline) {
+                                       checkpointLock.wait();
                                }
-                       }
-               }
 
-               @Override
-               public void cancel() {
-                       running = false;
+                               if (numCompleteCalls < expectedNumber) {
+                                       throw new TimeoutException("Did not 
complete " + expectedNumberOfCompletedCheckpoints +
+                                               " within timeout of " + 
timeoutMillis + " millis.");
+                               }
+                       }
                }
 
-               @Override
-               public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return 1;
-               }
+               static void awaitStateRestoredFromCheckpoint(long 
timeoutMillis) throws InterruptedException, TimeoutException {
+                       long deadline = System.nanoTime() + timeoutMillis * 
1_000_000;
 
-               @Override
-               public void restoreState(Serializable state) throws Exception {
-                       numRestoreStateCalls++;
-               }
+                       synchronized (checkpointLock) {
+                               while (!restoredFromCheckpoint && 
System.currentTimeMillis() <= deadline) {
+                                       checkpointLock.wait();
+                               }
 
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       checkpointCompleteLatch.countDown();
+                               if (!restoredFromCheckpoint) {
+                                       throw new TimeoutException("Did not 
restore from checkpoint within timeout of " + timeoutMillis + " millis.");
+                               }
+                       }
                }
        }
 

Reply via email to