Repository: flink
Updated Branches:
  refs/heads/master c46bdff0e -> 1f49926be


[tests] Flix flakey SimpleRecoveryITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f49926b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f49926b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f49926b

Branch: refs/heads/master
Commit: 1f49926be4fc2670e285d9159b5a4ef0227887ac
Parents: c46bdff
Author: Stephan Ewen <[email protected]>
Authored: Fri Feb 20 10:42:27 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 20 13:28:09 2015 +0100

----------------------------------------------------------------------
 .../test/recovery/SimpleRecoveryITCase.java     | 55 ++++++++++++++------
 1 file changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f49926b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 911edb3..f71783b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -67,8 +67,6 @@ public class SimpleRecoveryITCase {
        @Test
        public void testFailedRunThenSuccessfulRun() {
 
-               FailOnceMapper.failuresBeforeSuccess = 1;
-
                try {
                        List<Long> resultCollection = new ArrayList<Long>();
 
@@ -81,7 +79,7 @@ public class SimpleRecoveryITCase {
                                env.setNumberOfExecutionRetries(0);
 
                                env.generateSequence(1, 10)
-                                               .map(new FailOnceMapper<Long>())
+                                               .map(new FailingMapper1<Long>())
                                                .reduce(new 
ReduceFunction<Long>() {
                                                        @Override
                                                        public Long reduce(Long 
value1, Long value2) {
@@ -91,8 +89,9 @@ public class SimpleRecoveryITCase {
                                                .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
 
                                try {
-                                       env.execute();
-                                       fail("The program should have failed");
+                                       JobExecutionResult res = env.execute();
+                                       String msg = res == null ? "null 
result" : "result in " + res.getNetRuntime();
+                                       fail("The program should have failed, 
but returned " + msg);
                                }
                                catch (ProgramInvocationException e) {
                                        // expected
@@ -108,7 +107,7 @@ public class SimpleRecoveryITCase {
                                env.setNumberOfExecutionRetries(0);
 
                                env.generateSequence(1, 10)
-                                               .map(new FailOnceMapper<Long>())
+                                               .map(new FailingMapper1<Long>())
                                                .reduce(new 
ReduceFunction<Long>() {
                                                        @Override
                                                        public Long reduce(Long 
value1, Long value2) {
@@ -143,9 +142,6 @@ public class SimpleRecoveryITCase {
 
        @Test
        public void testRestart() {
-
-               FailOnceMapper.failuresBeforeSuccess = 1;
-
                try {
                        List<Long> resultCollection = new ArrayList<Long>();
 
@@ -156,7 +152,7 @@ public class SimpleRecoveryITCase {
                        env.setNumberOfExecutionRetries(1);
 
                        env.generateSequence(1, 10)
-                                       .map(new FailOnceMapper<Long>())
+                                       .map(new FailingMapper2<Long>())
                                        .reduce(new ReduceFunction<Long>() {
                                                @Override
                                                public Long reduce(Long value1, 
Long value2) {
@@ -189,9 +185,6 @@ public class SimpleRecoveryITCase {
 
        @Test
        public void testRestartMultipleTimes() {
-
-               FailOnceMapper.failuresBeforeSuccess = 3;
-
                try {
                        List<Long> resultCollection = new ArrayList<Long>();
 
@@ -202,7 +195,7 @@ public class SimpleRecoveryITCase {
                        env.setNumberOfExecutionRetries(3);
 
                        env.generateSequence(1, 10)
-                                       .map(new FailOnceMapper<Long>())
+                                       .map(new FailingMapper3<Long>())
                                        .reduce(new ReduceFunction<Long>() {
                                                @Override
                                                public Long reduce(Long value1, 
Long value2) {
@@ -235,9 +228,39 @@ public class SimpleRecoveryITCase {
 
        // 
------------------------------------------------------------------------------------
 
-       private static class FailOnceMapper<T> extends RichMapFunction<T, T> {
+       private static class FailingMapper1<T> extends RichMapFunction<T, T> {
+
+               private static int failuresBeforeSuccess = 1;
+
+               @Override
+               public T map(T value) throws Exception {
+                       if (failuresBeforeSuccess > 0 && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
+                               failuresBeforeSuccess--;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+
+       private static class FailingMapper2<T> extends RichMapFunction<T, T> {
+
+               private static int failuresBeforeSuccess = 1;
+
+               @Override
+               public T map(T value) throws Exception {
+                       if (failuresBeforeSuccess > 0 && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
+                               failuresBeforeSuccess--;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+
+       private static class FailingMapper3<T> extends RichMapFunction<T, T> {
 
-               private static int failuresBeforeSuccess = 0;
+               private static int failuresBeforeSuccess = 3;
 
                @Override
                public T map(T value) throws Exception {

Reply via email to