Repository: flink Updated Branches: refs/heads/master e288617f9 -> 4dee8feaf
[FLINK-5248] [tests] Catch restore failures in SavepointITCase - Minor test clean up - The test did not catch a task restore failure since only the TDDs were tested. Now, we test that restore is actually called and some checkpoints complete after restoring from a savepoint. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4dee8fea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4dee8fea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4dee8fea Branch: refs/heads/master Commit: 4dee8feaf7638bb043d7f3334cc8974456893a3d Parents: e288617 Author: Ufuk Celebi <[email protected]> Authored: Mon Dec 5 11:22:19 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Dec 5 11:53:56 2016 +0100 ---------------------------------------------------------------------- .../test/checkpointing/SavepointITCase.java | 247 +++++++++---------- 1 file changed, 119 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4dee8fea/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index ab9c1fa..d52f115 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -62,7 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.TestLogger; import org.junit.Rule; import org.junit.Test; @@ -76,14 +75,13 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.File; -import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess; import static org.junit.Assert.assertEquals; @@ -101,10 +99,7 @@ public class SavepointITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class); @Rule - public RetryRule retryRule = new RetryRule(); - - @Rule - public TemporaryFolder folder= new TemporaryFolder(); + public TemporaryFolder folder = new TemporaryFolder(); /** * Tests that it is possible to submit a job, trigger a savepoint, and @@ -114,8 +109,9 @@ public class SavepointITCase extends TestLogger { * <ol> * <li>Submit job, wait for some checkpoints to complete</li> * <li>Trigger savepoint and verify that savepoint has been created</li> - * <li>Shut down the cluster, re-submit the job from the savepoint, and - * verify that the initial state has been reset</li> + * <li>Shut down the cluster, re-submit the job from the savepoint, + * verify that the initial state has been reset, and + * all tasks are running again</li> * <li>Cancel job, dispose the savepoint, and verify that everything * has been cleaned up</li> * </ol> @@ -131,10 +127,11 @@ public class SavepointITCase extends TestLogger { final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); // The number of checkpoints to complete before triggering the savepoint - final int numberOfCompletedCheckpoints = 10; + final int numberOfCompletedCheckpoints = 2; + final int checkpointingInterval = 100; // Temporary directory for file state backend - final File tmpDir = CommonTestUtils.createTempDirectory(); + final File tmpDir = folder.newFolder(); LOG.info("Created temporary directory: " + tmpDir + "."); @@ -161,55 +158,50 @@ public class SavepointITCase extends TestLogger { config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, - checkpointDir.toURI().toString()); + checkpointDir.toURI().toString()); config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, - savepointDir.toURI().toString()); + savepointDir.toURI().toString()); LOG.info("Flink configuration: " + config + "."); // Start Flink flink = new TestingCluster(config); - LOG.info("Starting Flink cluster."); flink.start(); // Retrieve the job manager - LOG.info("Retrieving JobManager."); ActorGateway jobManager = Await.result( - flink.leaderGateway().future(), - deadline.timeLeft()); - LOG.info("JobManager: " + jobManager + "."); + flink.leaderGateway().future(), + deadline.timeLeft()); // Submit the job - final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000); + final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, checkpointingInterval); final JobID jobId = jobGraph.getJobID(); // Wait for the source to be notified about the expected number // of completed checkpoints - InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch( - numberOfCompletedCheckpoints); + StatefulCounter.resetForTest(); LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode."); flink.submitJobDetached(jobGraph); - LOG.info("Waiting for " + numberOfCompletedCheckpoints + - " checkpoint complete notifications."); + LOG.info("Waiting for " + numberOfCompletedCheckpoints + " checkpoint complete notifications."); // Wait... - InfiniteTestSource.CheckpointCompleteLatch.await(); + StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis()); LOG.info("Received all " + numberOfCompletedCheckpoints + - " checkpoint complete notifications."); + " checkpoint complete notifications."); // ...and then trigger the savepoint LOG.info("Triggering a savepoint."); Future<Object> savepointPathFuture = jobManager.ask( - new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft()); + new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft()); final String savepointPath = ((TriggerSavepointSuccess) Await - .result(savepointPathFuture, deadline.timeLeft())).savepointPath(); + .result(savepointPathFuture, deadline.timeLeft())).savepointPath(); LOG.info("Retrieved savepoint path: " + savepointPath + "."); // Only one savepoint should exist @@ -222,12 +214,10 @@ public class SavepointITCase extends TestLogger { // Retrieve the savepoint from the testing job manager LOG.info("Requesting the savepoint."); - Future<Object> savepointFuture = jobManager.ask( - new RequestSavepoint(savepointPath), - deadline.timeLeft()); + Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft()); SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result( - savepointFuture, deadline.timeLeft())).savepoint(); + savepointFuture, deadline.timeLeft())).savepoint(); LOG.info("Retrieved savepoint: " + savepointPath + "."); // Shut down the Flink cluster (thereby canceling the job) @@ -238,24 +228,16 @@ public class SavepointITCase extends TestLogger { // - Verification START ------------------------------------------- // Only one checkpoint of the savepoint should exist - String errMsg = "Checkpoints directory not cleaned up properly."; + // We currently have the following directory layout: checkpointDir/jobId/chk-ID files = checkpointDir.listFiles(); - if (files != null) { - assertEquals(errMsg, 1, files.length); - } - else { - fail(errMsg); - } + assertNotNull("Checkpoint directory empty", files); + assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length); + assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName()); // Only one savepoint should exist - errMsg = "Savepoints directory cleaned up."; files = savepointDir.listFiles(); - if (files != null) { - assertEquals(errMsg, 1, files.length); - } - else { - fail(errMsg); - } + assertNotNull("Savepoint directory empty", files); + assertEquals("No savepoint found in savepoint directory", 1, files.length); // - Verification END --------------------------------------------- @@ -265,11 +247,13 @@ public class SavepointITCase extends TestLogger { // Retrieve the job manager LOG.info("Retrieving JobManager."); - jobManager = Await.result( - flink.leaderGateway().future(), - deadline.timeLeft()); + jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft()); LOG.info("JobManager: " + jobManager + "."); + // Reset for restore + StatefulCounter.resetForTest(); + + // Gather all task deployment descriptors final Throwable[] error = new Throwable[1]; final TestingCluster finalFlink = flink; final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create(); @@ -282,15 +266,16 @@ public class SavepointITCase extends TestLogger { // Register to all submit task messages for job for (ActorRef taskManager : finalFlink.getTaskManagersAsJava()) { taskManager.tell(new TestingTaskManagerMessages - .RegisterSubmitTaskListener(jobId), getTestActor()); + .RegisterSubmitTaskListener(jobId), getTestActor()); } // Set the savepoint path jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " + - "savepoint path " + savepointPath + " in detached mode."); + "savepoint path " + savepointPath + " in detached mode."); + // Submit the job finalFlink.submitJobDetached(jobGraph); int numTasks = 0; @@ -300,12 +285,12 @@ public class SavepointITCase extends TestLogger { // Gather the task deployment descriptors LOG.info("Gathering " + numTasks + " submitted " + - "TaskDeploymentDescriptor instances."); + "TaskDeploymentDescriptor instances."); for (int i = 0; i < numTasks; i++) { ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener) - expectMsgAnyClassOf(getRemainingTime(), - ResponseSubmitTaskListener.class); + expectMsgAnyClassOf(getRemainingTime(), + ResponseSubmitTaskListener.class); TaskDeploymentDescriptor tdd = resp.tdd(); @@ -317,8 +302,7 @@ public class SavepointITCase extends TestLogger { tdds.put(taskInformation.getJobVertexId(), tdd); } - } - catch (Throwable t) { + } catch (Throwable t) { error[0] = t; } } @@ -327,7 +311,7 @@ public class SavepointITCase extends TestLogger { // - Verification START ------------------------------------------- - errMsg = "Error during gathering of TaskDeploymentDescriptors"; + String errMsg = "Error during gathering of TaskDeploymentDescriptors"; assertNull(errMsg, error[0]); // Verify that all tasks, which are part of the savepoint @@ -336,7 +320,7 @@ public class SavepointITCase extends TestLogger { Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(taskState.getJobVertexID()); errMsg = "Missing task for savepoint state for operator " - + taskState.getJobVertexID() + "."; + + taskState.getJobVertexID() + "."; assertTrue(errMsg, taskTdds.size() > 0); assertEquals(taskState.getNumberCollectedStates(), taskTdds.size()); @@ -348,24 +332,27 @@ public class SavepointITCase extends TestLogger { errMsg = "Initial operator state mismatch."; assertEquals(errMsg, subtaskState.getLegacyOperatorState(), - tdd.getTaskStateHandles().getLegacyOperatorState()); + tdd.getTaskStateHandles().getLegacyOperatorState()); } } + // Await state is restored + StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis()); + + // Await some progress after restore + StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis()); + // - Verification END --------------------------------------------- LOG.info("Cancelling job " + jobId + "."); jobManager.tell(new CancelJob(jobId)); LOG.info("Disposing savepoint " + savepointPath + "."); - Future<Object> disposeFuture = jobManager.ask( - new DisposeSavepoint(savepointPath), - deadline.timeLeft()); + Future<Object> disposeFuture = jobManager.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft()); errMsg = "Failed to dispose savepoint " + savepointPath + "."; Object resp = Await.result(disposeFuture, deadline.timeLeft()); - assertTrue(errMsg, resp.getClass() == - getDisposeSavepointSuccess().getClass()); + assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass()); // - Verification START ------------------------------------------- @@ -399,12 +386,11 @@ public class SavepointITCase extends TestLogger { // All savepoints should have been cleaned up errMsg = "Savepoints directory not cleaned up properly: " + - Arrays.toString(savepointDir.listFiles()) + "."; + Arrays.toString(savepointDir.listFiles()) + "."; assertEquals(errMsg, 0, savepointDir.listFiles().length); // - Verification END --------------------------------------------- - } - finally { + } finally { if (flink != null) { flink.shutdown(); } @@ -436,7 +422,7 @@ public class SavepointITCase extends TestLogger { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, - savepointDir.toURI().toString()); + savepointDir.toURI().toString()); LOG.info("Flink configuration: " + config + "."); @@ -448,8 +434,8 @@ public class SavepointITCase extends TestLogger { // Retrieve the job manager LOG.info("Retrieving JobManager."); ActorGateway jobManager = Await.result( - flink.leaderGateway().future(), - deadline.timeLeft()); + flink.leaderGateway().future(), + deadline.timeLeft()); LOG.info("JobManager: " + jobManager + "."); // High value to ensure timeouts if restarted. @@ -467,13 +453,11 @@ public class SavepointITCase extends TestLogger { try { flink.submitJobAndWait(jobGraph, false); - } - catch (Exception e) { + } catch (Exception e) { assertEquals(JobExecutionException.class, e.getClass()); assertEquals(IllegalArgumentException.class, e.getCause().getClass()); } - } - finally { + } finally { if (flink != null) { flink.shutdown(); } @@ -488,10 +472,10 @@ public class SavepointITCase extends TestLogger { * Creates a streaming JobGraph from the StreamEnvironment. */ private JobGraph createJobGraph( - int parallelism, - int numberOfRetries, - long restartDelay, - int checkpointingInterval) { + int parallelism, + int numberOfRetries, + long restartDelay, + int checkpointingInterval) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); @@ -501,24 +485,20 @@ public class SavepointITCase extends TestLogger { env.getConfig().disableSysoutLogging(); DataStream<Integer> stream = env - .addSource(new InfiniteTestSource()) - .shuffle() - .map(new StatefulCounter()); + .addSource(new InfiniteTestSource()) + .shuffle() + .map(new StatefulCounter()); stream.addSink(new DiscardingSink<Integer>()); return env.getStreamGraph().getJobGraph(); } - private static class InfiniteTestSource - implements SourceFunction<Integer>, CheckpointListener { + private static class InfiniteTestSource implements SourceFunction<Integer> { private static final long serialVersionUID = 1L; private volatile boolean running = true; - // Test control - private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(1); - @Override public void run(SourceContext<Integer> ctx) throws Exception { while (running) { @@ -530,16 +510,16 @@ public class SavepointITCase extends TestLogger { public void cancel() { running = false; } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - CheckpointCompleteLatch.countDown(); - } } private static class StatefulCounter - extends RichMapFunction<Integer, Integer> - implements Checkpointed<byte[]> { + extends RichMapFunction<Integer, Integer> + implements Checkpointed<byte[]>, CheckpointListener { + + private static final Object checkpointLock = new Object(); + private static int numCompleteCalls; + private static int numRestoreCalls; + private static boolean restoredFromCheckpoint; private static final long serialVersionUID = 7317800376639115920L; private byte[] data; @@ -570,55 +550,66 @@ public class SavepointITCase extends TestLogger { @Override public void restoreState(byte[] data) throws Exception { this.data = data; + + synchronized (checkpointLock) { + if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) { + restoredFromCheckpoint = true; + checkpointLock.notifyAll(); + } + } } - } - /** - * Test source that counts calls to restoreState and that can be configured - * to fail on restoreState calls. - */ - private static class RestoreStateCountingAndFailingSource - implements SourceFunction<Integer>, Checkpointed, CheckpointListener { + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + synchronized (checkpointLock) { + numCompleteCalls++; + checkpointLock.notifyAll(); + } + } - private static final long serialVersionUID = 1L; + // -------------------------------------------------------------------- - private static volatile int numRestoreStateCalls = 0; - private static volatile boolean failOnRestoreStateCall = false; - private static volatile CountDownLatch checkpointCompleteLatch = new CountDownLatch(1); - private static volatile int emitted = 0; + static void resetForTest() { + synchronized (checkpointLock) { + numCompleteCalls = 0; + numRestoreCalls = 0; + restoredFromCheckpoint = false; + } + } - private volatile boolean running = true; + static void awaitCompletedCheckpoints( + int parallelism, + int expectedNumberOfCompletedCheckpoints, + long timeoutMillis) throws InterruptedException, TimeoutException { - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - while (running) { - ctx.collect(1); - emitted++; + long deadline = System.nanoTime() + timeoutMillis * 1_000_000; - if (failOnRestoreStateCall) { - throw new RuntimeException("Restore test failure"); + synchronized (checkpointLock) { + // One completion notification per parallel subtask + int expectedNumber = parallelism * expectedNumberOfCompletedCheckpoints; + while (numCompleteCalls < expectedNumber && System.nanoTime() <= deadline) { + checkpointLock.wait(); } - } - } - @Override - public void cancel() { - running = false; + if (numCompleteCalls < expectedNumber) { + throw new TimeoutException("Did not complete " + expectedNumberOfCompletedCheckpoints + + " within timeout of " + timeoutMillis + " millis."); + } + } } - @Override - public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return 1; - } + static void awaitStateRestoredFromCheckpoint(long timeoutMillis) throws InterruptedException, TimeoutException { + long deadline = System.nanoTime() + timeoutMillis * 1_000_000; - @Override - public void restoreState(Serializable state) throws Exception { - numRestoreStateCalls++; - } + synchronized (checkpointLock) { + while (!restoredFromCheckpoint && System.currentTimeMillis() <= deadline) { + checkpointLock.wait(); + } - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - checkpointCompleteLatch.countDown(); + if (!restoredFromCheckpoint) { + throw new TimeoutException("Did not restore from checkpoint within timeout of " + timeoutMillis + " millis."); + } + } } }
