Repository: flink
Updated Branches:
  refs/heads/master c532638a0 -> 776253cbb


[hotfix] [streaming-java] Harden IterateTest

Adds retries and timeout scaling to all iteration tests, which rely
on iteration timeouts. The way the tests rely on these timoeuts is
prone to races. If the failures occur again, I vote to ignore the
tests until iteration termination is fixed properly.

Example test failures:
- https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215892/log.txt
- https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215975/log.txt

This closes #2087.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/776253cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/776253cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/776253cb

Branch: refs/heads/master
Commit: 776253cbb0574337b0736dc27711007821ba3d2c
Parents: c532638
Author: Ufuk Celebi <[email protected]>
Authored: Thu Jun 9 11:34:34 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Jun 9 13:54:49 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/IterateTest.java | 327 ++++++++++++-------
 1 file changed, 201 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/776253cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index c4343f6..c6875dd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.api;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -29,7 +24,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.util.MathUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -52,13 +46,25 @@ import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.MathUtils;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings({ "unchecked", "unused", "serial" })
 public class IterateTest extends StreamingMultipleProgramsTestBase {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(IterateTest.class);
+
        private static boolean iterated[];
 
        @Test(expected = UnsupportedOperationException.class)
@@ -366,100 +372,135 @@ public class IterateTest extends 
StreamingMultipleProgramsTestBase {
        @SuppressWarnings("rawtypes")
        @Test
        public void testSimpleIteration() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               iterated = new boolean[DEFAULT_PARALLELISM];
+               int numRetries = 5;
+               int timeoutScale = 1;
 
-               DataStream<Boolean> source = 
env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
-                               .map(NoOpBoolMap).name("ParallelizeMap");
+               for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+                       try {
+                               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                               iterated = new boolean[DEFAULT_PARALLELISM];
 
-               IterativeStream<Boolean> iteration = source.iterate(3000);
+                               DataStream<Boolean> source = 
env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+                                               
.map(NoOpBoolMap).name("ParallelizeMap");
 
-               DataStream<Boolean> increment = iteration.flatMap(new 
IterationHead()).map(NoOpBoolMap);
+                               IterativeStream<Boolean> iteration = 
source.iterate(3000 * timeoutScale);
 
-               iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
+                               DataStream<Boolean> increment = 
iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
 
-               iteration.closeWith(increment).addSink(new 
ReceiveCheckNoOpSink());
+                               iteration.map(NoOpBoolMap).addSink(new 
ReceiveCheckNoOpSink());
 
-               env.execute();
+                               iteration.closeWith(increment).addSink(new 
ReceiveCheckNoOpSink());
 
-               for (boolean iter : iterated) {
-                       assertTrue(iter);
-               }
+                               env.execute();
 
+                               for (boolean iter : iterated) {
+                                       assertTrue(iter);
+                               }
+
+                               break; // success
+                       } catch (Throwable t) {
+                               LOG.info("Run " + (numRetry + 1) + "/" + 
numRetries + " failed", t);
+
+                               if (numRetry >= numRetries - 1) {
+                                       throw t;
+                               } else {
+                                       timeoutScale *= 2;
+                               }
+                       }
+               }
        }
 
        @Test
        public void testCoIteration() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(2);
+               int numRetries = 5;
+               int timeoutScale = 1;
 
-               DataStream<String> otherSource = env.fromElements("1000", 
"2000")
-                               .map(NoOpStrMap).name("ParallelizeMap");
+               for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+                       try {
+                               TestSink.collected = new ArrayList<>();
 
+                               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                               env.setParallelism(2);
 
-               ConnectedIterativeStreams<Integer, String> coIt = 
env.fromElements(0, 0)
-                               .map(NoOpIntMap).name("ParallelizeMap")
-                               .iterate(2000)
-                               .withFeedbackType("String");
+                               DataStream<String> otherSource = 
env.fromElements("1000", "2000")
+                                               
.map(NoOpStrMap).name("ParallelizeMap");
 
-               try {
-                       coIt.keyBy(1, 2);
-                       fail();
-               } catch (InvalidProgramException e) {
-                       // this is expected
-               }
 
-               DataStream<String> head = coIt
-                               .flatMap(new RichCoFlatMapFunction<Integer, 
String, String>() {
+                               ConnectedIterativeStreams<Integer, String> coIt 
= env.fromElements(0, 0)
+                                               
.map(NoOpIntMap).name("ParallelizeMap")
+                                               .iterate(2000 * timeoutScale)
+                                               .withFeedbackType("String");
+
+                               try {
+                                       coIt.keyBy(1, 2);
+                                       fail();
+                               } catch (InvalidProgramException e) {
+                                       // this is expected
+                               }
 
-                                       private static final long 
serialVersionUID = 1L;
-                                       boolean seenFromSource = false;
+                               DataStream<String> head = coIt
+                                               .flatMap(new 
RichCoFlatMapFunction<Integer, String, String>() {
 
-                                       @Override
-                                       public void flatMap1(Integer value, 
Collector<String> out) throws Exception {
-                                               out.collect(((Integer) (value + 
1)).toString());
-                                       }
+                                                       private static final 
long serialVersionUID = 1L;
+                                                       boolean seenFromSource 
= false;
+
+                                                       @Override
+                                                       public void 
flatMap1(Integer value, Collector<String> out) throws Exception {
+                                                               
out.collect(((Integer) (value + 1)).toString());
+                                                       }
+
+                                                       @Override
+                                                       public void 
flatMap2(String value, Collector<String> out) throws Exception {
+                                                               Integer intVal 
= Integer.valueOf(value);
+                                                               if (intVal < 2) 
{
+                                                                       
out.collect(((Integer) (intVal + 1)).toString());
+                                                               }
+                                                               if (intVal == 
1000 || intVal == 2000) {
+                                                                       
seenFromSource = true;
+                                                               }
+                                                       }
+
+                                                       @Override
+                                                       public void close() {
+                                                               
assertTrue(seenFromSource);
+                                                       }
+                                               });
+
+                               coIt.map(new CoMapFunction<Integer, String, 
String>() {
 
                                        @Override
-                                       public void flatMap2(String value, 
Collector<String> out) throws Exception {
-                                               Integer intVal = 
Integer.valueOf(value);
-                                               if (intVal < 2) {
-                                                       out.collect(((Integer) 
(intVal + 1)).toString());
-                                               }
-                                               if (intVal == 1000 || intVal == 
2000) {
-                                                       seenFromSource = true;
-                                               }
+                                       public String map1(Integer value) 
throws Exception {
+                                               return value.toString();
                                        }
 
                                        @Override
-                                       public void close() {
-                                               assertTrue(seenFromSource);
+                                       public String map2(String value) throws 
Exception {
+                                               return value;
                                        }
-                               });
+                               }).addSink(new ReceiveCheckNoOpSink<String>());
 
-               coIt.map(new CoMapFunction<Integer, String, String>() {
+                               
coIt.closeWith(head.broadcast().union(otherSource));
 
-                       @Override
-                       public String map1(Integer value) throws Exception {
-                               return value.toString();
-                       }
+                               head.addSink(new TestSink()).setParallelism(1);
 
-                       @Override
-                       public String map2(String value) throws Exception {
-                               return value;
-                       }
-               }).addSink(new ReceiveCheckNoOpSink<String>());
+                               assertEquals(1, 
env.getStreamGraph().getIterationSourceSinkPairs().size());
 
-               coIt.closeWith(head.broadcast().union(otherSource));
+                               env.execute();
 
-               head.addSink(new TestSink()).setParallelism(1);
+                               Collections.sort(TestSink.collected);
+                               assertEquals(Arrays.asList("1", "1", "2", "2", 
"2", "2"), TestSink.collected);
 
-               assertEquals(1, 
env.getStreamGraph().getIterationSourceSinkPairs().size());
-
-               env.execute();
+                               break; // success
+                       } catch (Throwable t) {
+                               LOG.info("Run " + (numRetry + 1) + "/" + 
numRetries + " failed", t);
 
-               Collections.sort(TestSink.collected);
-               assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), 
TestSink.collected);
+                               if (numRetry >= numRetries - 1) {
+                                       throw t;
+                               } else {
+                                       timeoutScale *= 2;
+                               }
+                       }
+               }
        }
 
        /**
@@ -473,89 +514,123 @@ public class IterateTest extends 
StreamingMultipleProgramsTestBase {
      */
        @Test
        public void testGroupByFeedback() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(DEFAULT_PARALLELISM - 1);
+               int numRetries = 5;
+               int timeoutScale = 1;
 
-               KeySelector<Integer, Integer> key = new KeySelector<Integer, 
Integer>() {
+               for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+                       try {
+                               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                               env.setParallelism(DEFAULT_PARALLELISM - 1);
 
-                       @Override
-                       public Integer getKey(Integer value) throws Exception {
-                               return value % 3;
-                       }
-               };
+                               KeySelector<Integer, Integer> key = new 
KeySelector<Integer, Integer>() {
 
-               DataStream<Integer> source = env.fromElements(1, 2, 3)
-                               .map(NoOpIntMap).name("ParallelizeMap");
+                                       @Override
+                                       public Integer getKey(Integer value) 
throws Exception {
+                                               return value % 3;
+                                       }
+                               };
 
-               IterativeStream<Integer> it = source.keyBy(key).iterate(3000);
+                               DataStream<Integer> source = 
env.fromElements(1, 2, 3)
+                                               
.map(NoOpIntMap).name("ParallelizeMap");
 
-               DataStream<Integer> head = it.flatMap(new 
RichFlatMapFunction<Integer, Integer>() {
+                               IterativeStream<Integer> it = 
source.keyBy(key).iterate(3000 * timeoutScale);
 
-                       int received = 0;
-                       int key = -1;
+                               DataStream<Integer> head = it.flatMap(new 
RichFlatMapFunction<Integer, Integer>() {
 
-                       @Override
-                       public void flatMap(Integer value, Collector<Integer> 
out) throws Exception {
-                               received++;
-                               if (key == -1) {
-                                       key = MathUtils.murmurHash(value % 3) % 
3;
-                               } else {
-                                       assertEquals(key, 
MathUtils.murmurHash(value % 3) % 3);
-                               }
-                               if (value > 0) {
-                                       out.collect(value - 1);
-                               }
-                       }
+                                       int received = 0;
+                                       int key = -1;
 
-                       @Override
-                       public void close() {
-                               assertTrue(received > 1);
-                       }
-               });
+                                       @Override
+                                       public void flatMap(Integer value, 
Collector<Integer> out) throws Exception {
+                                               received++;
+                                               if (key == -1) {
+                                                       key = 
MathUtils.murmurHash(value % 3) % 3;
+                                               } else {
+                                                       assertEquals(key, 
MathUtils.murmurHash(value % 3) % 3);
+                                               }
+                                               if (value > 0) {
+                                                       out.collect(value - 1);
+                                               }
+                                       }
 
-               
it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new
 ReceiveCheckNoOpSink<Integer>());
+                                       @Override
+                                       public void close() {
+                                               assertTrue(received > 1);
+                                       }
+                               });
 
-               env.execute();
+                               
it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new
 ReceiveCheckNoOpSink<Integer>());
+
+                               env.execute();
+
+                               break; // success
+                       } catch (Throwable t) {
+                               LOG.info("Run " + (numRetry + 1) + "/" + 
numRetries + " failed", t);
+
+                               if (numRetry >= numRetries - 1) {
+                                       throw t;
+                               } else {
+                                       timeoutScale *= 2;
+                               }
+                       }
+               }
        }
 
        @SuppressWarnings("deprecation")
        @Test
        public void testWithCheckPointing() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               int numRetries = 5;
+               int timeoutScale = 1;
 
-               env.enableCheckpointing();
+               for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+                       try {
+                               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-               DataStream<Boolean> source = env 
.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
-                               .map(NoOpBoolMap).name("ParallelizeMap");
+                               env.enableCheckpointing();
 
+                               DataStream<Boolean> source = 
env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+                                               
.map(NoOpBoolMap).name("ParallelizeMap");
 
-               IterativeStream<Boolean> iteration = source.iterate(3000);
 
-               iteration.closeWith(iteration.flatMap(new 
IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
+                               IterativeStream<Boolean> iteration = 
source.iterate(3000 * timeoutScale);
 
-               try {
-                       env.execute();
+                               iteration.closeWith(iteration.flatMap(new 
IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
 
-                       // this statement should never be reached
-                       fail();
-               } catch (UnsupportedOperationException e) {
-                       // expected behaviour
-               }
+                               try {
+                                       env.execute();
 
-               // Test force checkpointing
+                                       // this statement should never be 
reached
+                                       fail();
+                               } catch (UnsupportedOperationException e) {
+                                       // expected behaviour
+                               }
 
-               try {
-                       env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, false);
-                       env.execute();
+                               // Test force checkpointing
 
-                       // this statement should never be reached
-                       fail();
-               } catch (UnsupportedOperationException e) {
-                       // expected behaviour
-               }
+                               try {
+                                       env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, false);
+                                       env.execute();
+
+                                       // this statement should never be 
reached
+                                       fail();
+                               } catch (UnsupportedOperationException e) {
+                                       // expected behaviour
+                               }
+
+                               env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, true);
+                               env.getStreamGraph().getJobGraph();
 
-               env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, 
true);
-               env.getStreamGraph().getJobGraph();
+                               break; // success
+                       } catch (Throwable t) {
+                               LOG.info("Run " + (numRetry + 1) + "/" + 
numRetries + " failed", t);
+
+                               if (numRetry >= numRetries - 1) {
+                                       throw t;
+                               } else {
+                                       timeoutScale *= 2;
+                               }
+                       }
+               }
        }
 
        public static final class IterationHead extends 
RichFlatMapFunction<Boolean, Boolean> {

Reply via email to