Repository: flink Updated Branches: refs/heads/release-1.1 e4ca3a587 -> a5065e316
[FLINK-5248] [tests] Catch restore failures in SavepointITCase - Minor test clean up and reduced checkpointing interval for speed up - The test did not catch a task restore failure since only the TDDs were tested. Now, we test that restore is actually called and some checkpoints complete after restoring from a savepoint. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5065e31 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5065e31 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5065e31 Branch: refs/heads/release-1.1 Commit: a5065e316ab05db4801e56283dc8def800905d50 Parents: e4ca3a5 Author: Ufuk Celebi <[email protected]> Authored: Mon Dec 5 11:50:38 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Dec 5 11:50:39 2016 +0100 ---------------------------------------------------------------------- .../test/checkpointing/SavepointITCase.java | 112 +++++++++++++++---- 1 file changed, 90 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a5065e31/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 2878e74..3186b6e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -89,6 +89,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess; @@ -134,7 +135,8 @@ public class SavepointITCase extends TestLogger { final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); // The number of checkpoints to complete before triggering the savepoint - final int numberOfCompletedCheckpoints = 10; + final int numberOfCompletedCheckpoints = 2; + final int checkpointingInterval = 100; // Temporary directory for file state backend final File tmpDir = CommonTestUtils.createTempDirectory(); @@ -184,13 +186,12 @@ public class SavepointITCase extends TestLogger { LOG.info("JobManager: " + jobManager + "."); // Submit the job - final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000); + final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, checkpointingInterval); final JobID jobId = jobGraph.getJobID(); // Wait for the source to be notified about the expected number // of completed checkpoints - InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch( - numberOfCompletedCheckpoints); + StatefulCounter.resetForTest(); LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode."); @@ -200,7 +201,7 @@ public class SavepointITCase extends TestLogger { " checkpoint complete notifications."); // Wait... - InfiniteTestSource.CheckpointCompleteLatch.await(); + StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis()); LOG.info("Received all " + numberOfCompletedCheckpoints + " checkpoint complete notifications."); @@ -232,24 +233,16 @@ public class SavepointITCase extends TestLogger { // - Verification START ------------------------------------------- // Only one checkpoint of the savepoint should exist - String errMsg = "Checkpoints directory not cleaned up properly."; + // We currently have the following directory layout: checkpointDir/jobId/chk-ID File[] files = checkpointDir.listFiles(); - if (files != null) { - assertEquals(errMsg, 1, files.length); - } - else { - fail(errMsg); - } + assertNotNull("Checkpoint directory empty", files); + assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length); + assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName()); // Only one savepoint should exist - errMsg = "Savepoints directory cleaned up."; files = savepointDir.listFiles(); - if (files != null) { - assertEquals(errMsg, 1, files.length); - } - else { - fail(errMsg); - } + assertNotNull("Savepoint directory empty", files); + assertEquals("No savepoint found in savepoint directory", 1, files.length); // - Verification END --------------------------------------------- @@ -264,6 +257,10 @@ public class SavepointITCase extends TestLogger { deadline.timeLeft()); LOG.info("JobManager: " + jobManager + "."); + // Reset for restore + StatefulCounter.resetForTest(); + + // Gather all task deployment descriptors final Throwable[] error = new Throwable[1]; final ForkableFlinkMiniCluster finalFlink = flink; final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create(); @@ -321,7 +318,7 @@ public class SavepointITCase extends TestLogger { // - Verification START ------------------------------------------- - errMsg = "Error during gathering of TaskDeploymentDescriptors"; + String errMsg = "Error during gathering of TaskDeploymentDescriptors"; assertNull(errMsg, error[0]); // Verify that all tasks, which are part of the savepoint @@ -344,6 +341,12 @@ public class SavepointITCase extends TestLogger { } } + // Await state is restored + StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis()); + + // Await some progress after restore + StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis()); + // - Verification END --------------------------------------------- LOG.info("Cancelling job " + jobId + "."); @@ -932,8 +935,13 @@ public class SavepointITCase extends TestLogger { } private static class StatefulCounter - extends RichMapFunction<Integer, Integer> - implements Checkpointed<byte[]> { + extends RichMapFunction<Integer, Integer> + implements Checkpointed<byte[]>, CheckpointListener { + + private static final Object checkpointLock = new Object(); + private static int numCompleteCalls; + private static int numRestoreCalls; + private static boolean restoredFromCheckpoint; private static final long serialVersionUID = 7317800376639115920L; private byte[] data; @@ -964,6 +972,66 @@ public class SavepointITCase extends TestLogger { @Override public void restoreState(byte[] data) throws Exception { this.data = data; + + synchronized (checkpointLock) { + if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) { + restoredFromCheckpoint = true; + checkpointLock.notifyAll(); + } + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + synchronized (checkpointLock) { + numCompleteCalls++; + checkpointLock.notifyAll(); + } + } + + // -------------------------------------------------------------------- + + static void resetForTest() { + synchronized (checkpointLock) { + numCompleteCalls = 0; + numRestoreCalls = 0; + restoredFromCheckpoint = false; + } + } + + static void awaitCompletedCheckpoints( + int parallelism, + int expectedNumberOfCompletedCheckpoints, + long timeoutMillis) throws InterruptedException, TimeoutException { + + long deadline = System.nanoTime() + timeoutMillis * 1_000_000; + + synchronized (checkpointLock) { + // One completion notification per parallel subtask + int expectedNumber = parallelism * expectedNumberOfCompletedCheckpoints; + while (numCompleteCalls < expectedNumber && System.nanoTime() <= deadline) { + checkpointLock.wait(); + } + + if (numCompleteCalls < expectedNumber) { + throw new TimeoutException("Did not complete " + expectedNumberOfCompletedCheckpoints + + " within timeout of " + timeoutMillis + " millis."); + } + } + } + + static void awaitStateRestoredFromCheckpoint(long timeoutMillis) throws InterruptedException, TimeoutException { + long deadline = System.nanoTime() + timeoutMillis * 1_000_000; + + synchronized (checkpointLock) { + while (!restoredFromCheckpoint && System.currentTimeMillis() <= deadline) { + checkpointLock.wait(); + } + + if (!restoredFromCheckpoint) { + throw new TimeoutException("Did not restore from checkpoint within timeout of " + timeoutMillis + " millis."); + } + } } }
