Repository: flink Updated Branches: refs/heads/master c532638a0 -> 776253cbb
[hotfix] [streaming-java] Harden IterateTest Adds retries and timeout scaling to all iteration tests, which rely on iteration timeouts. The way the tests rely on these timoeuts is prone to races. If the failures occur again, I vote to ignore the tests until iteration termination is fixed properly. Example test failures: - https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215892/log.txt - https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215975/log.txt This closes #2087. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/776253cb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/776253cb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/776253cb Branch: refs/heads/master Commit: 776253cbb0574337b0736dc27711007821ba3d2c Parents: c532638 Author: Ufuk Celebi <[email protected]> Authored: Thu Jun 9 11:34:34 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Thu Jun 9 13:54:49 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/streaming/api/IterateTest.java | 327 ++++++++++++------- 1 file changed, 201 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/776253cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java index c4343f6..c6875dd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -17,11 +17,6 @@ package org.apache.flink.streaming.api; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -29,7 +24,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.util.MathUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -52,13 +46,25 @@ import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.ReceiveCheckNoOpSink; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; +import org.apache.flink.util.MathUtils; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings({ "unchecked", "unused", "serial" }) public class IterateTest extends StreamingMultipleProgramsTestBase { + private static final Logger LOG = LoggerFactory.getLogger(IterateTest.class); + private static boolean iterated[]; @Test(expected = UnsupportedOperationException.class) @@ -366,100 +372,135 @@ public class IterateTest extends StreamingMultipleProgramsTestBase { @SuppressWarnings("rawtypes") @Test public void testSimpleIteration() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - iterated = new boolean[DEFAULT_PARALLELISM]; + int numRetries = 5; + int timeoutScale = 1; - DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) - .map(NoOpBoolMap).name("ParallelizeMap"); + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + iterated = new boolean[DEFAULT_PARALLELISM]; - IterativeStream<Boolean> iteration = source.iterate(3000); + DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) + .map(NoOpBoolMap).name("ParallelizeMap"); - DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap); + IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale); - iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink()); + DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap); - iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink()); + iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink()); - env.execute(); + iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink()); - for (boolean iter : iterated) { - assertTrue(iter); - } + env.execute(); + for (boolean iter : iterated) { + assertTrue(iter); + } + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } @Test public void testCoIteration() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); + int numRetries = 5; + int timeoutScale = 1; - DataStream<String> otherSource = env.fromElements("1000", "2000") - .map(NoOpStrMap).name("ParallelizeMap"); + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + TestSink.collected = new ArrayList<>(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); - ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0) - .map(NoOpIntMap).name("ParallelizeMap") - .iterate(2000) - .withFeedbackType("String"); + DataStream<String> otherSource = env.fromElements("1000", "2000") + .map(NoOpStrMap).name("ParallelizeMap"); - try { - coIt.keyBy(1, 2); - fail(); - } catch (InvalidProgramException e) { - // this is expected - } - DataStream<String> head = coIt - .flatMap(new RichCoFlatMapFunction<Integer, String, String>() { + ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0) + .map(NoOpIntMap).name("ParallelizeMap") + .iterate(2000 * timeoutScale) + .withFeedbackType("String"); + + try { + coIt.keyBy(1, 2); + fail(); + } catch (InvalidProgramException e) { + // this is expected + } - private static final long serialVersionUID = 1L; - boolean seenFromSource = false; + DataStream<String> head = coIt + .flatMap(new RichCoFlatMapFunction<Integer, String, String>() { - @Override - public void flatMap1(Integer value, Collector<String> out) throws Exception { - out.collect(((Integer) (value + 1)).toString()); - } + private static final long serialVersionUID = 1L; + boolean seenFromSource = false; + + @Override + public void flatMap1(Integer value, Collector<String> out) throws Exception { + out.collect(((Integer) (value + 1)).toString()); + } + + @Override + public void flatMap2(String value, Collector<String> out) throws Exception { + Integer intVal = Integer.valueOf(value); + if (intVal < 2) { + out.collect(((Integer) (intVal + 1)).toString()); + } + if (intVal == 1000 || intVal == 2000) { + seenFromSource = true; + } + } + + @Override + public void close() { + assertTrue(seenFromSource); + } + }); + + coIt.map(new CoMapFunction<Integer, String, String>() { @Override - public void flatMap2(String value, Collector<String> out) throws Exception { - Integer intVal = Integer.valueOf(value); - if (intVal < 2) { - out.collect(((Integer) (intVal + 1)).toString()); - } - if (intVal == 1000 || intVal == 2000) { - seenFromSource = true; - } + public String map1(Integer value) throws Exception { + return value.toString(); } @Override - public void close() { - assertTrue(seenFromSource); + public String map2(String value) throws Exception { + return value; } - }); + }).addSink(new ReceiveCheckNoOpSink<String>()); - coIt.map(new CoMapFunction<Integer, String, String>() { + coIt.closeWith(head.broadcast().union(otherSource)); - @Override - public String map1(Integer value) throws Exception { - return value.toString(); - } + head.addSink(new TestSink()).setParallelism(1); - @Override - public String map2(String value) throws Exception { - return value; - } - }).addSink(new ReceiveCheckNoOpSink<String>()); + assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size()); - coIt.closeWith(head.broadcast().union(otherSource)); + env.execute(); - head.addSink(new TestSink()).setParallelism(1); + Collections.sort(TestSink.collected); + assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); - assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size()); - - env.execute(); + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); - Collections.sort(TestSink.collected); - assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } /** @@ -473,89 +514,123 @@ public class IterateTest extends StreamingMultipleProgramsTestBase { */ @Test public void testGroupByFeedback() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(DEFAULT_PARALLELISM - 1); + int numRetries = 5; + int timeoutScale = 1; - KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() { + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM - 1); - @Override - public Integer getKey(Integer value) throws Exception { - return value % 3; - } - }; + KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() { - DataStream<Integer> source = env.fromElements(1, 2, 3) - .map(NoOpIntMap).name("ParallelizeMap"); + @Override + public Integer getKey(Integer value) throws Exception { + return value % 3; + } + }; - IterativeStream<Integer> it = source.keyBy(key).iterate(3000); + DataStream<Integer> source = env.fromElements(1, 2, 3) + .map(NoOpIntMap).name("ParallelizeMap"); - DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() { + IterativeStream<Integer> it = source.keyBy(key).iterate(3000 * timeoutScale); - int received = 0; - int key = -1; + DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() { - @Override - public void flatMap(Integer value, Collector<Integer> out) throws Exception { - received++; - if (key == -1) { - key = MathUtils.murmurHash(value % 3) % 3; - } else { - assertEquals(key, MathUtils.murmurHash(value % 3) % 3); - } - if (value > 0) { - out.collect(value - 1); - } - } + int received = 0; + int key = -1; - @Override - public void close() { - assertTrue(received > 1); - } - }); + @Override + public void flatMap(Integer value, Collector<Integer> out) throws Exception { + received++; + if (key == -1) { + key = MathUtils.murmurHash(value % 3) % 3; + } else { + assertEquals(key, MathUtils.murmurHash(value % 3) % 3); + } + if (value > 0) { + out.collect(value - 1); + } + } - it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>()); + @Override + public void close() { + assertTrue(received > 1); + } + }); - env.execute(); + it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>()); + + env.execute(); + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } @SuppressWarnings("deprecation") @Test public void testWithCheckPointing() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + int numRetries = 5; + int timeoutScale = 1; - env.enableCheckpointing(); + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<Boolean> source = env .fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) - .map(NoOpBoolMap).name("ParallelizeMap"); + env.enableCheckpointing(); + DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) + .map(NoOpBoolMap).name("ParallelizeMap"); - IterativeStream<Boolean> iteration = source.iterate(3000); - iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>()); + IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale); - try { - env.execute(); + iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>()); - // this statement should never be reached - fail(); - } catch (UnsupportedOperationException e) { - // expected behaviour - } + try { + env.execute(); - // Test force checkpointing + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } - try { - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false); - env.execute(); + // Test force checkpointing - // this statement should never be reached - fail(); - } catch (UnsupportedOperationException e) { - // expected behaviour - } + try { + env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false); + env.execute(); + + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } + + env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); + env.getStreamGraph().getJobGraph(); - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); - env.getStreamGraph().getJobGraph(); + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
