Repository: flink Updated Branches: refs/heads/master c46bdff0e -> 1f49926be
[tests] Flix flakey SimpleRecoveryITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f49926b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f49926b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f49926b Branch: refs/heads/master Commit: 1f49926be4fc2670e285d9159b5a4ef0227887ac Parents: c46bdff Author: Stephan Ewen <[email protected]> Authored: Fri Feb 20 10:42:27 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 20 13:28:09 2015 +0100 ---------------------------------------------------------------------- .../test/recovery/SimpleRecoveryITCase.java | 55 ++++++++++++++------ 1 file changed, 39 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1f49926b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java index 911edb3..f71783b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -67,8 +67,6 @@ public class SimpleRecoveryITCase { @Test public void testFailedRunThenSuccessfulRun() { - FailOnceMapper.failuresBeforeSuccess = 1; - try { List<Long> resultCollection = new ArrayList<Long>(); @@ -81,7 +79,7 @@ public class SimpleRecoveryITCase { env.setNumberOfExecutionRetries(0); env.generateSequence(1, 10) - .map(new FailOnceMapper<Long>()) + .map(new FailingMapper1<Long>()) .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) { @@ -91,8 +89,9 @@ public class SimpleRecoveryITCase { .output(new LocalCollectionOutputFormat<Long>(resultCollection)); try { - env.execute(); - fail("The program should have failed"); + JobExecutionResult res = env.execute(); + String msg = res == null ? "null result" : "result in " + res.getNetRuntime(); + fail("The program should have failed, but returned " + msg); } catch (ProgramInvocationException e) { // expected @@ -108,7 +107,7 @@ public class SimpleRecoveryITCase { env.setNumberOfExecutionRetries(0); env.generateSequence(1, 10) - .map(new FailOnceMapper<Long>()) + .map(new FailingMapper1<Long>()) .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) { @@ -143,9 +142,6 @@ public class SimpleRecoveryITCase { @Test public void testRestart() { - - FailOnceMapper.failuresBeforeSuccess = 1; - try { List<Long> resultCollection = new ArrayList<Long>(); @@ -156,7 +152,7 @@ public class SimpleRecoveryITCase { env.setNumberOfExecutionRetries(1); env.generateSequence(1, 10) - .map(new FailOnceMapper<Long>()) + .map(new FailingMapper2<Long>()) .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) { @@ -189,9 +185,6 @@ public class SimpleRecoveryITCase { @Test public void testRestartMultipleTimes() { - - FailOnceMapper.failuresBeforeSuccess = 3; - try { List<Long> resultCollection = new ArrayList<Long>(); @@ -202,7 +195,7 @@ public class SimpleRecoveryITCase { env.setNumberOfExecutionRetries(3); env.generateSequence(1, 10) - .map(new FailOnceMapper<Long>()) + .map(new FailingMapper3<Long>()) .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) { @@ -235,9 +228,39 @@ public class SimpleRecoveryITCase { // ------------------------------------------------------------------------------------ - private static class FailOnceMapper<T> extends RichMapFunction<T, T> { + private static class FailingMapper1<T> extends RichMapFunction<T, T> { + + private static int failuresBeforeSuccess = 1; + + @Override + public T map(T value) throws Exception { + if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) { + failuresBeforeSuccess--; + throw new Exception("Test Failure"); + } + + return value; + } + } + + private static class FailingMapper2<T> extends RichMapFunction<T, T> { + + private static int failuresBeforeSuccess = 1; + + @Override + public T map(T value) throws Exception { + if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) { + failuresBeforeSuccess--; + throw new Exception("Test Failure"); + } + + return value; + } + } + + private static class FailingMapper3<T> extends RichMapFunction<T, T> { - private static int failuresBeforeSuccess = 0; + private static int failuresBeforeSuccess = 3; @Override public T map(T value) throws Exception {
