Repository: flink
Updated Branches:
  refs/heads/release-1.1 e4ca3a587 -> a5065e316


[FLINK-5248] [tests] Catch restore failures in SavepointITCase

- Minor test clean up and reduced checkpointing interval for speed up
- The test did not catch a task restore failure since only the
  TDDs were tested. Now, we test that restore is actually called
  and some checkpoints complete after restoring from a savepoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5065e31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5065e31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5065e31

Branch: refs/heads/release-1.1
Commit: a5065e316ab05db4801e56283dc8def800905d50
Parents: e4ca3a5
Author: Ufuk Celebi <[email protected]>
Authored: Mon Dec 5 11:50:38 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Dec 5 11:50:39 2016 +0100

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 112 +++++++++++++++----
 1 file changed, 90 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5065e31/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2878e74..3186b6e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -89,6 +89,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
@@ -134,7 +135,8 @@ public class SavepointITCase extends TestLogger {
                final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
 
                // The number of checkpoints to complete before triggering the 
savepoint
-               final int numberOfCompletedCheckpoints = 10;
+               final int numberOfCompletedCheckpoints = 2;
+               final int checkpointingInterval = 100;
 
                // Temporary directory for file state backend
                final File tmpDir = CommonTestUtils.createTempDirectory();
@@ -184,13 +186,12 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("JobManager: " + jobManager + ".");
 
                        // Submit the job
-                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000, 1000);
+                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000, checkpointingInterval);
                        final JobID jobId = jobGraph.getJobID();
 
                        // Wait for the source to be notified about the 
expected number
                        // of completed checkpoints
-                       InfiniteTestSource.CheckpointCompleteLatch = new 
CountDownLatch(
-                                       numberOfCompletedCheckpoints);
+                       StatefulCounter.resetForTest();
 
                        LOG.info("Submitting job " + jobGraph.getJobID() + " in 
detached mode.");
 
@@ -200,7 +201,7 @@ public class SavepointITCase extends TestLogger {
                                        " checkpoint complete notifications.");
 
                        // Wait...
-                       InfiniteTestSource.CheckpointCompleteLatch.await();
+                       StatefulCounter.awaitCompletedCheckpoints(parallelism, 
numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
 
                        LOG.info("Received all " + numberOfCompletedCheckpoints 
+
                                        " checkpoint complete notifications.");
@@ -232,24 +233,16 @@ public class SavepointITCase extends TestLogger {
                        // - Verification START 
-------------------------------------------
 
                        // Only one checkpoint of the savepoint should exist
-                       String errMsg = "Checkpoints directory not cleaned up 
properly.";
+                       // We currently have the following directory layout: 
checkpointDir/jobId/chk-ID
                        File[] files = checkpointDir.listFiles();
-                       if (files != null) {
-                               assertEquals(errMsg, 1, files.length);
-                       }
-                       else {
-                               fail(errMsg);
-                       }
+                       assertNotNull("Checkpoint directory empty", files);
+                       assertEquals("Checkpoints directory cleaned up, but 
needed for savepoint.", 1, files.length);
+                       assertEquals("No job-specific base directory", 
jobGraph.getJobID().toString(), files[0].getName());
 
                        // Only one savepoint should exist
-                       errMsg = "Savepoints directory cleaned up.";
                        files = savepointDir.listFiles();
-                       if (files != null) {
-                               assertEquals(errMsg, 1, files.length);
-                       }
-                       else {
-                               fail(errMsg);
-                       }
+                       assertNotNull("Savepoint directory empty", files);
+                       assertEquals("No savepoint found in savepoint 
directory", 1, files.length);
 
                        // - Verification END 
---------------------------------------------
 
@@ -264,6 +257,10 @@ public class SavepointITCase extends TestLogger {
                                        deadline.timeLeft());
                        LOG.info("JobManager: " + jobManager + ".");
 
+                       // Reset for restore
+                       StatefulCounter.resetForTest();
+
+                       // Gather all task deployment descriptors
                        final Throwable[] error = new Throwable[1];
                        final ForkableFlinkMiniCluster finalFlink = flink;
                        final Multimap<JobVertexID, TaskDeploymentDescriptor> 
tdds = HashMultimap.create();
@@ -321,7 +318,7 @@ public class SavepointITCase extends TestLogger {
 
                        // - Verification START 
-------------------------------------------
 
-                       errMsg = "Error during gathering of 
TaskDeploymentDescriptors";
+                       String errMsg = "Error during gathering of 
TaskDeploymentDescriptors";
                        assertNull(errMsg, error[0]);
 
                        // Verify that all tasks, which are part of the 
savepoint
@@ -344,6 +341,12 @@ public class SavepointITCase extends TestLogger {
                                }
                        }
 
+                       // Await state is restored
+                       
StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis());
+
+                       // Await some progress after restore
+                       StatefulCounter.awaitCompletedCheckpoints(parallelism, 
numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+
                        // - Verification END 
---------------------------------------------
 
                        LOG.info("Cancelling job " + jobId + ".");
@@ -932,8 +935,13 @@ public class SavepointITCase extends TestLogger {
        }
 
        private static class StatefulCounter
-                       extends RichMapFunction<Integer, Integer>
-                       implements Checkpointed<byte[]> {
+               extends RichMapFunction<Integer, Integer>
+               implements Checkpointed<byte[]>, CheckpointListener {
+
+               private static final Object checkpointLock = new Object();
+               private static int numCompleteCalls;
+               private static int numRestoreCalls;
+               private static boolean restoredFromCheckpoint;
 
                private static final long serialVersionUID = 
7317800376639115920L;
                private byte[] data;
@@ -964,6 +972,66 @@ public class SavepointITCase extends TestLogger {
                @Override
                public void restoreState(byte[] data) throws Exception {
                        this.data = data;
+
+                       synchronized (checkpointLock) {
+                               if (++numRestoreCalls == 
getRuntimeContext().getNumberOfParallelSubtasks()) {
+                                       restoredFromCheckpoint = true;
+                                       checkpointLock.notifyAll();
+                               }
+                       }
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       synchronized (checkpointLock) {
+                               numCompleteCalls++;
+                               checkpointLock.notifyAll();
+                       }
+               }
+
+               // 
--------------------------------------------------------------------
+
+               static void resetForTest() {
+                       synchronized (checkpointLock) {
+                               numCompleteCalls = 0;
+                               numRestoreCalls = 0;
+                               restoredFromCheckpoint = false;
+                       }
+               }
+
+               static void awaitCompletedCheckpoints(
+                       int parallelism,
+                       int expectedNumberOfCompletedCheckpoints,
+                       long timeoutMillis) throws InterruptedException, 
TimeoutException {
+
+                       long deadline = System.nanoTime() + timeoutMillis * 
1_000_000;
+
+                       synchronized (checkpointLock) {
+                               // One completion notification per parallel 
subtask
+                               int expectedNumber = parallelism * 
expectedNumberOfCompletedCheckpoints;
+                               while (numCompleteCalls < expectedNumber && 
System.nanoTime() <= deadline) {
+                                       checkpointLock.wait();
+                               }
+
+                               if (numCompleteCalls < expectedNumber) {
+                                       throw new TimeoutException("Did not 
complete " + expectedNumberOfCompletedCheckpoints +
+                                               " within timeout of " + 
timeoutMillis + " millis.");
+                               }
+                       }
+               }
+
+               static void awaitStateRestoredFromCheckpoint(long 
timeoutMillis) throws InterruptedException, TimeoutException {
+                       long deadline = System.nanoTime() + timeoutMillis * 
1_000_000;
+
+                       synchronized (checkpointLock) {
+                               while (!restoredFromCheckpoint && 
System.currentTimeMillis() <= deadline) {
+                                       checkpointLock.wait();
+                               }
+
+                               if (!restoredFromCheckpoint) {
+                                       throw new TimeoutException("Did not 
restore from checkpoint within timeout of " + timeoutMillis + " millis.");
+                               }
+                       }
                }
        }
 

Reply via email to