Repository: flink Updated Branches: refs/heads/master fc4abd7ff -> 07a7b73be
[FLINK-4056] [tests] Harden SavepointITCase Trying to prevent failures like [1] from happening again. I could not explain who deleted the savepoint file concurrently with the exists check. The savepoint is triggered and retrieved successfully. Shutting down the cluster does not remove any savepoints. [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/136396433/log.txt Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a7b73b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a7b73b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a7b73b Branch: refs/heads/master Commit: 07a7b73be71d286392cdb83540f87c2b715c7a14 Parents: fc4abd7 Author: Ufuk Celebi <[email protected]> Authored: Thu Jun 16 15:32:24 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Fri Jun 17 13:39:48 2016 +0200 ---------------------------------------------------------------------- .../flink/test/checkpointing/SavepointITCase.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/07a7b73b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 9ceae79..89761ff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -63,7 +63,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.testutils.junit.RetryOnFailure; +import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +76,7 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.File; +import java.io.FileNotFoundException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -99,6 +103,9 @@ public class SavepointITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class); + @Rule + public RetryRule retryRule = new RetryRule(); + /** * Tests that it is possible to submit a job, trigger a savepoint, and * later restart the job on a new cluster. The savepoint is written to @@ -399,6 +406,7 @@ public class SavepointITCase extends TestLogger { * a proper Exception on submission. */ @Test + @RetryOnFailure(times = 2) public void testCheckpointHasBeenRemoved() throws Exception { // Config int numTaskManagers = 2; @@ -503,7 +511,10 @@ public class SavepointITCase extends TestLogger { flink.shutdown(); // Remove the checkpoint files - FileUtils.deleteDirectory(checkpointDir); + try { + FileUtils.deleteDirectory(checkpointDir); + } catch (FileNotFoundException ignored) { + } // Restart the cluster LOG.info("Restarting Flink cluster.");
