[FLINK-5929] [tests] Fix SavepointITCase instability

When shutting down the testing cluster it can happen that checkpoint
files lingered around (checkpoints independent of the savepoint).

This commit deactives checkpointing for the test and uses count down
latches to track progress, which also reduces the test time.

This closes #3427


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c24c7ec3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c24c7ec3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c24c7ec3

Branch: refs/heads/master
Commit: c24c7ec3332d0eb6ebb24eb70c9aabd055cc129f
Parents: 7f244b8
Author: Ufuk Celebi <[email protected]>
Authored: Tue Feb 28 11:13:28 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 200 ++++++-------------
 1 file changed, 62 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c24c7ec3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index ac37009..ee371dd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -24,7 +24,7 @@ import akka.testkit.JavaTestKit;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import java.io.FileNotFoundException;
-import org.apache.commons.io.FileUtils;
+import java.util.concurrent.CountDownLatch;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -54,7 +54,6 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -95,7 +94,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
@@ -116,12 +114,11 @@ public class SavepointITCase extends TestLogger {
        public TemporaryFolder folder = new TemporaryFolder();
 
        /**
-        * Tests that it is possible to submit a job, trigger a savepoint, and
-        * later restart the job on a new cluster. The savepoint is written to
-        * a file.
+        * Triggers a savepoint for a job that uses the FsStateBackend. We 
expect
+        * that all checkpoint files are written to a new savepoint directory.
         *
         * <ol>
-        * <li>Submit job, wait for some checkpoints to complete</li>
+        * <li>Submit job, wait for some progress</li>
         * <li>Trigger savepoint and verify that savepoint has been created</li>
         * <li>Shut down the cluster, re-submit the job from the savepoint,
         * verify that the initial state has been reset, and
@@ -131,23 +128,13 @@ public class SavepointITCase extends TestLogger {
         * </ol>
         */
        @Test
-       public void testTriggerSavepointAndResume() throws Exception {
+       public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() 
throws Exception {
                // Config
-               int numTaskManagers = 2;
-               int numSlotsPerTaskManager = 2;
-               int parallelism = numTaskManagers * numSlotsPerTaskManager;
-
-               // Test deadline
+               final int numTaskManagers = 2;
+               final int numSlotsPerTaskManager = 2;
+               final int parallelism = numTaskManagers * 
numSlotsPerTaskManager;
                final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
-
-               // The number of checkpoints to complete before triggering the 
savepoint
-               final int numberOfCompletedCheckpoints = 2;
-               final int checkpointingInterval = 100;
-
-               // Temporary directory for file state backend
-               final File tmpDir = folder.newFolder();
-
-               LOG.info("Created temporary directory: " + tmpDir + ".");
+               final File testRoot = folder.newFolder();
 
                TestingCluster flink = null;
 
@@ -160,70 +147,51 @@ public class SavepointITCase extends TestLogger {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
 
-                       final File checkpointDir = new File(tmpDir, 
"checkpoints");
-                       final File savepointRootDir = new File(tmpDir, 
"savepoints");
+                       final File checkpointDir = new File(testRoot, 
"checkpoints");
+                       final File savepointRootDir = new File(testRoot, 
"savepoints");
 
                        if (!checkpointDir.mkdir() || 
!savepointRootDir.mkdirs()) {
                                fail("Test setup failed: failed to create 
temporary directories.");
                        }
 
-                       LOG.info("Created temporary checkpoint directory: " + 
checkpointDir + ".");
-                       LOG.info("Created temporary savepoint directory: " + 
savepointRootDir + ".");
-
+                       // Use file based checkpoints
                        config.setString(CoreOptions.STATE_BACKEND, 
"filesystem");
-                       
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-                               checkpointDir.toURI().toString());
+                       
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
checkpointDir.toURI().toString());
                        
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-                       
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-                               savepointRootDir.toURI().toString());
-
-                       LOG.info("Flink configuration: " + config + ".");
+                       
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
savepointRootDir.toURI().toString());
 
                        // Start Flink
                        flink = new TestingCluster(config);
-                       flink.start();
-
-                       // Retrieve the job manager
-                       ActorGateway jobManager = Await.result(
-                               flink.leaderGateway().future(),
-                               deadline.timeLeft());
+                       flink.start(true);
 
                        // Submit the job
-                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000, checkpointingInterval);
+                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000);
                        final JobID jobId = jobGraph.getJobID();
 
-                       // Wait for the source to be notified about the 
expected number
-                       // of completed checkpoints
-                       StatefulCounter.resetForTest();
+                       // Reset the static test job helpers
+                       StatefulCounter.resetForTest(parallelism);
+
+                       // Retrieve the job manager
+                       ActorGateway jobManager = 
Await.result(flink.leaderGateway().future(), deadline.timeLeft());
 
                        LOG.info("Submitting job " + jobGraph.getJobID() + " in 
detached mode.");
 
                        flink.submitJobDetached(jobGraph);
 
-                       LOG.info("Waiting for " + numberOfCompletedCheckpoints 
+ " checkpoint complete notifications.");
+                       LOG.info("Waiting for some progress.");
 
-                       // Wait...
-                       StatefulCounter.awaitCompletedCheckpoints(parallelism, 
numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+                       
StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
 
-                       LOG.info("Received all " + numberOfCompletedCheckpoints 
+
-                               " checkpoint complete notifications.");
-
-                       // ...and then trigger the savepoint
                        LOG.info("Triggering a savepoint.");
-
-                       Future<Object> savepointPathFuture = jobManager.ask(
-                               new TriggerSavepoint(jobId, 
Option.<String>empty()), deadline.timeLeft());
-
-                       final String savepointPath = ((TriggerSavepointSuccess) 
Await
-                               .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
+                       Future<Object> savepointPathFuture = jobManager.ask(new 
TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+                       final String savepointPath = ((TriggerSavepointSuccess) 
Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
                        LOG.info("Retrieved savepoint path: " + savepointPath + 
".");
 
                        // Retrieve the savepoint from the testing job manager
                        LOG.info("Requesting the savepoint.");
                        Future<Object> savepointFuture = jobManager.ask(new 
RequestSavepoint(savepointPath), deadline.timeLeft());
 
-                       SavepointV1 savepoint = (SavepointV1) 
((ResponseSavepoint) Await.result(
-                               savepointFuture, 
deadline.timeLeft())).savepoint();
+                       SavepointV1 savepoint = (SavepointV1) 
((ResponseSavepoint) Await.result(savepointFuture, 
deadline.timeLeft())).savepoint();
                        LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
                        // Shut down the Flink cluster (thereby canceling the 
job)
@@ -243,26 +211,25 @@ public class SavepointITCase extends TestLogger {
                                File savepointDir = files[0];
                                File[] savepointFiles = 
savepointDir.listFiles();
                                assertNotNull(savepointFiles);
-                               assertTrue("Did not write savepoint files to 
directory",savepointFiles.length > 1);
+
+                               // Expect one metadata file and one checkpoint 
file per stateful
+                               // parallel subtask
+                               String errMsg = "Did not write expected number 
of savepoint/checkpoint files to directory: "
+                                       + Arrays.toString(savepointFiles);
+                               assertEquals(errMsg, 1 + parallelism, 
savepointFiles.length);
                        } else {
                                fail("Savepoint not created in expected 
directory");
                        }
 
-                       // Only one checkpoint of the savepoint should exist
                        // We currently have the following directory layout: 
checkpointDir/jobId/chk-ID
                        File jobCheckpoints = new File(checkpointDir, 
jobId.toString());
 
                        if (jobCheckpoints.exists()) {
                                files = jobCheckpoints.listFiles();
                                assertNotNull("Checkpoint directory empty", 
files);
-                               assertEquals("Checkpoints directory not cleaned 
up: " + Arrays.toString(files), 0, files.length);
+                               assertEquals("Checkpoints directory not clean: 
" + Arrays.toString(files), 0, files.length);
                        }
 
-                       // Only one savepoint should exist
-                       files = savepointRootDir.listFiles();
-                       assertNotNull("Savepoint directory empty", files);
-                       assertEquals("No savepoint found in savepoint 
directory", 1, files.length);
-
                        // - Verification END 
---------------------------------------------
 
                        // Restart the cluster
@@ -274,13 +241,14 @@ public class SavepointITCase extends TestLogger {
                        jobManager = 
Await.result(flink.leaderGateway().future(), deadline.timeLeft());
                        LOG.info("JobManager: " + jobManager + ".");
 
-                       // Reset for restore
-                       StatefulCounter.resetForTest();
+                       // Reset static test helpers
+                       StatefulCounter.resetForTest(parallelism);
 
                        // Gather all task deployment descriptors
                        final Throwable[] error = new Throwable[1];
                        final TestingCluster finalFlink = flink;
                        final Multimap<JobVertexID, TaskDeploymentDescriptor> 
tdds = HashMultimap.create();
+
                        new JavaTestKit(testActorSystem) {{
 
                                new Within(deadline.timeLeft()) {
@@ -361,10 +329,10 @@ public class SavepointITCase extends TestLogger {
                        }
 
                        // Await state is restored
-                       
StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis());
+                       
StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
 
                        // Await some progress after restore
-                       StatefulCounter.awaitCompletedCheckpoints(parallelism, 
numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+                       
StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
 
                        // - Verification END 
---------------------------------------------
 
@@ -396,7 +364,7 @@ public class SavepointITCase extends TestLogger {
                                }
                        }
 
-                       // The checkpoint of the savepoint should have been 
discarded
+                       // The checkpoint files of the savepoint should have 
been discarded
                        for (File f : checkpointFiles) {
                                errMsg = "Checkpoint file " + f + " not cleaned 
up properly.";
                                assertFalse(errMsg, f.exists());
@@ -418,10 +386,6 @@ public class SavepointITCase extends TestLogger {
                        if (flink != null) {
                                flink.shutdown();
                        }
-
-                       if (tmpDir != null) {
-                               FileUtils.deleteDirectory(tmpDir);
-                       }
                }
        }
 
@@ -467,7 +431,7 @@ public class SavepointITCase extends TestLogger {
                        // Submit the job
                        // Long delay to ensure that the test times out if the 
job
                        // manager tries to restart the job.
-                       final JobGraph jobGraph = createJobGraph(parallelism, 
numberOfRetries, 3600000, 1000);
+                       final JobGraph jobGraph = createJobGraph(parallelism, 
numberOfRetries, 3600000);
 
                        // Set non-existing savepoint path
                        
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown 
path"));
@@ -498,12 +462,10 @@ public class SavepointITCase extends TestLogger {
        private JobGraph createJobGraph(
                int parallelism,
                int numberOfRetries,
-               long restartDelay,
-               int checkpointingInterval) {
+               long restartDelay) {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
-               env.enableCheckpointing(checkpointingInterval);
                env.disableOperatorChaining();
                
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(numberOfRetries,
 restartDelay));
                env.getConfig().disableSysoutLogging();
@@ -526,7 +488,9 @@ public class SavepointITCase extends TestLogger {
                @Override
                public void run(SourceContext<Integer> ctx) throws Exception {
                        while (running) {
-                               ctx.collect(1);
+                               synchronized (ctx.getCheckpointLock()) {
+                                       ctx.collect(1);
+                               }
                        }
                }
 
@@ -536,14 +500,12 @@ public class SavepointITCase extends TestLogger {
                }
        }
 
-       private static class StatefulCounter
-               extends RichMapFunction<Integer, Integer>
-               implements ListCheckpointed<byte[]>, CheckpointListener {
+       private static class StatefulCounter extends RichMapFunction<Integer, 
Integer> implements ListCheckpointed<byte[]>{
+
+               private static volatile CountDownLatch progressLatch = new 
CountDownLatch(0);
+               private static volatile CountDownLatch restoreLatch = new 
CountDownLatch(0);
 
-               private static final Object checkpointLock = new Object();
-               private static int numCompleteCalls;
-               private static int numRestoreCalls;
-               private static boolean restoredFromCheckpoint;
+               private int numCollectedElements = 0;
 
                private static final long serialVersionUID = 
7317800376639115920L;
                private byte[] data;
@@ -563,6 +525,11 @@ public class SavepointITCase extends TestLogger {
                        for (int i = 0; i < data.length; i++) {
                                data[i] += 1;
                        }
+
+                       if (numCollectedElements++ > 10) {
+                               progressLatch.countDown();
+                       }
+
                        return value;
                }
 
@@ -578,65 +545,22 @@ public class SavepointITCase extends TestLogger {
                        }
                        this.data = state.get(0);
 
-                       synchronized (checkpointLock) {
-                               if (++numRestoreCalls == 
getRuntimeContext().getNumberOfParallelSubtasks()) {
-                                       restoredFromCheckpoint = true;
-                                       checkpointLock.notifyAll();
-                               }
-                       }
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       synchronized (checkpointLock) {
-                               numCompleteCalls++;
-                               checkpointLock.notifyAll();
-                       }
+                       restoreLatch.countDown();
                }
 
                // 
--------------------------------------------------------------------
 
-               static void resetForTest() {
-                       synchronized (checkpointLock) {
-                               numCompleteCalls = 0;
-                               numRestoreCalls = 0;
-                               restoredFromCheckpoint = false;
-                       }
+               static CountDownLatch getProgressLatch() {
+                       return progressLatch;
                }
 
-               static void awaitCompletedCheckpoints(
-                               int parallelism,
-                               int expectedNumberOfCompletedCheckpoints,
-                               long timeoutMillis) throws 
InterruptedException, TimeoutException {
-
-                       long deadline = System.nanoTime() + timeoutMillis * 
1_000_000;
-
-                       synchronized (checkpointLock) {
-                               // One completion notification per parallel 
subtask
-                               int expectedNumber = parallelism * 
expectedNumberOfCompletedCheckpoints;
-                               while (numCompleteCalls < expectedNumber && 
System.nanoTime() <= deadline) {
-                                       checkpointLock.wait();
-                               }
-
-                               if (numCompleteCalls < expectedNumber) {
-                                       throw new TimeoutException("Did not 
complete " + expectedNumberOfCompletedCheckpoints +
-                                               " within timeout of " + 
timeoutMillis + " millis.");
-                               }
-                       }
+               static CountDownLatch getRestoreLatch() {
+                       return restoreLatch;
                }
 
-               static void awaitStateRestoredFromCheckpoint(long 
timeoutMillis) throws InterruptedException, TimeoutException {
-                       long deadline = System.nanoTime() + timeoutMillis * 
1_000_000;
-
-                       synchronized (checkpointLock) {
-                               while (!restoredFromCheckpoint && 
System.currentTimeMillis() <= deadline) {
-                                       checkpointLock.wait();
-                               }
-
-                               if (!restoredFromCheckpoint) {
-                                       throw new TimeoutException("Did not 
restore from checkpoint within timeout of " + timeoutMillis + " millis.");
-                               }
-                       }
+               static void resetForTest(int parallelism) {
+                       progressLatch = new CountDownLatch(parallelism);
+                       restoreLatch = new CountDownLatch(parallelism);
                }
        }
 

Reply via email to