[FLINK-3422][streaming] Update tests reliant on hashing Closes #1685
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0f93c27 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0f93c27 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0f93c27 Branch: refs/heads/master Commit: f0f93c27650a73610b229f52362787a09390e023 Parents: 0ff286d Author: Márton Balassi <[email protected]> Authored: Sun Feb 21 23:01:00 2016 +0100 Committer: Márton Balassi <[email protected]> Committed: Wed Mar 2 17:28:52 2016 +0100 ---------------------------------------------------------------------- .../storm/tests/StormFieldsGroupingITCase.java | 12 ++++++++--- .../apache/flink/streaming/api/IterateTest.java | 9 ++++++++ .../streaming/api/StreamingOperatorsITCase.java | 22 ++++++++++++-------- .../api/scala/StreamingOperatorsITCase.scala | 4 +++- 4 files changed, 34 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f0f93c27/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index dfadd77..b873345 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -21,13 +21,19 @@ import backtype.storm.Config; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.tests.operators.FiniteRandomSpout; import org.apache.flink.storm.tests.operators.TaskIdBolt; import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.util.StreamingProgramTestBase; +/** + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. + */ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { private final static String topologyId = "FieldsGrouping Test"; @@ -43,9 +49,9 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory("4> -1930858313\n" + "4> 1431162155\n" + "4> 1654374947\n" - + "4> -65105105\n" + "3> -1155484576\n" + "3> 1033096058\n" + "3> -1557280266\n" - + "3> -1728529858\n" + "3> -518907128\n" + "3> -252332814", this.resultPath); + compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + + "3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" + + "4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/f0f93c27/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java index 920185a..27a1e3c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -462,6 +462,15 @@ public class IterateTest extends StreamingMultipleProgramsTestBase { assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); } + /** + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. + * + * For the test to pass all FlatMappers must see at least two records in the iteration, + * which can only be achieved if the hashed values of the input keys map to a complete + * congruence system. Given that the test is designed for 3 parallel FlatMapper instances + * keys chosen from the [1,3] range are a suitable choice. + */ @Test public void testGroupByFeedback() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/f0f93c27/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java index 42febea..9530d09 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SplitStream; @@ -67,12 +68,13 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase * of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple * value. Each group is folded where the second tuple value is summed up. * - * @throws Exception + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. */ @Test - public void testFoldOperation() throws Exception { + public void testGroupedFoldOperation() throws Exception { int numElements = 10; - int numKeys = 2; + final int numKeys = 2; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys)); @@ -85,9 +87,13 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase return accumulator + value.f1; } }).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() { + int key = -1; @Override public Tuple2<Integer, Integer> map(Integer value) throws Exception { - return new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), value); + if (key == -1){ + key = MathUtils.murmurHash(value) % numKeys; + } + return new Tuple2<>(key, value); } }).split(new OutputSelector<Tuple2<Integer, Integer>>() { @Override @@ -95,7 +101,6 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase List<String> output = new ArrayList<>(); output.add(value.f0 + ""); - return output; } }); @@ -120,7 +125,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase int counter2 = 0; for (int i = 0; i < numElements; i++) { - if (i % 2 == 0) { + if (MathUtils.murmurHash(i) % numKeys == 0) { counter1 += i; builder1.append(counter1 + "\n"); } else { @@ -196,7 +201,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase @Override public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception { for (int i = 0; i < numElements; i++) { - ctx.collect(new Tuple2<Integer, NonSerializable>(i, new NonSerializable(i))); + ctx.collect(new Tuple2<>(i, new NonSerializable(i))); } } @@ -217,14 +222,13 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { for (int i = 0; i < numElements; i++) { - Tuple2<Integer, Integer> result = new Tuple2<>(i % numKeys, i); + Tuple2<Integer, Integer> result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i); ctx.collect(result); } } @Override public void cancel() { - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f0f93c27/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index fe49b1f..e573fe0 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -58,9 +58,11 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { * The stream is grouped by the first field. For each group, the resulting stream is folded by * summing up the second tuple field. * + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. */ @Test - def testFoldOperator(): Unit = { + def testGroupedFoldOperator(): Unit = { val numElements = 10 val numKeys = 2
