[FLINK-3422][streaming] Update tests reliant on hashing

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b862fd0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b862fd0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b862fd0b

Branch: refs/heads/release-1.0
Commit: b862fd0b3657d8b9026a54782bad5a1fb71c19f4
Parents: a049d80
Author: Márton Balassi <[email protected]>
Authored: Sun Feb 21 23:01:00 2016 +0100
Committer: Márton Balassi <[email protected]>
Committed: Wed Mar 2 12:47:23 2016 +0100

----------------------------------------------------------------------
 .../storm/tests/StormFieldsGroupingITCase.java  | 12 ++++++++---
 .../apache/flink/streaming/api/IterateTest.java |  9 ++++++++
 .../streaming/api/StreamingOperatorsITCase.java | 22 ++++++++++++--------
 .../api/scala/StreamingOperatorsITCase.scala    |  4 +++-
 4 files changed, 34 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b862fd0b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index dfadd77..b873345 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -21,13 +21,19 @@ import backtype.storm.Config;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
+import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
 import org.apache.flink.storm.tests.operators.TaskIdBolt;
 import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 
+/**
+ * This test relies on the hash function used by the {@link DataStream#keyBy}, 
which is
+ * assumed to be {@link MathUtils#murmurHash}.
+ */
 public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
 
        private final static String topologyId = "FieldsGrouping Test";
@@ -43,9 +49,9 @@ public class StormFieldsGroupingITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory("4> -1930858313\n" + "4> 
1431162155\n" + "4> 1654374947\n"
-                               + "4> -65105105\n" + "3> -1155484576\n" + "3> 
1033096058\n" + "3> -1557280266\n"
-                               + "3> -1728529858\n" + "3> -518907128\n" + "3> 
-252332814", this.resultPath);
+               compareResultsByLinesInMemory("3> -1155484576\n" + "3> 
1033096058\n" + "3> -1930858313\n" +
+                       "3> 1431162155\n" + "4> -1557280266\n" + "4> 
-1728529858\n" + "4> 1654374947\n" +
+                       "4> -65105105\n" + "4> -518907128\n" + "4> 
-252332814\n", this.resultPath);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b862fd0b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 920185a..27a1e3c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -462,6 +462,15 @@ public class IterateTest extends 
StreamingMultipleProgramsTestBase {
                assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), 
TestSink.collected);
        }
 
+       /**
+        * This test relies on the hash function used by the {@link 
DataStream#keyBy}, which is
+        * assumed to be {@link MathUtils#murmurHash}.
+        *
+        * For the test to pass all FlatMappers must see at least two records 
in the iteration,
+        * which can only be achieved if the hashed values of the input keys 
map to a complete
+        * congruence system. Given that the test is designed for 3 parallel 
FlatMapper instances
+        * keys chosen from the [1,3] range are a suitable choice.
+     */
        @Test
        public void testGroupByFeedback() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/b862fd0b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
index 42febea..9530d09 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SplitStream;
@@ -67,12 +68,13 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
         * of Tuple2<Integer, Integer> is created. The stream is grouped 
according to the first tuple
         * value. Each group is folded where the second tuple value is summed 
up.
         *
-        * @throws Exception
+        * This test relies on the hash function used by the {@link 
DataStream#keyBy}, which is
+        * assumed to be {@link MathUtils#murmurHash}.
         */
        @Test
-       public void testFoldOperation() throws Exception {
+       public void testGroupedFoldOperation() throws Exception {
                int numElements = 10;
-               int numKeys = 2;
+               final int numKeys = 2;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream<Tuple2<Integer, Integer>> sourceStream = 
env.addSource(new TupleSource(numElements, numKeys));
@@ -85,9 +87,13 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                                        return accumulator + value.f1;
                                }
                        }).map(new RichMapFunction<Integer, Tuple2<Integer, 
Integer>>() {
+                               int key = -1;
                                @Override
                                public Tuple2<Integer, Integer> map(Integer 
value) throws Exception {
-                                       return new Tuple2<Integer, 
Integer>(getRuntimeContext().getIndexOfThisSubtask(), value);
+                                       if (key == -1){
+                                               key = 
MathUtils.murmurHash(value) % numKeys;
+                                       }
+                                       return new Tuple2<>(key, value);
                                }
                        }).split(new OutputSelector<Tuple2<Integer, Integer>>() 
{
                                @Override
@@ -95,7 +101,6 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                                        List<String> output = new ArrayList<>();
 
                                        output.add(value.f0 + "");
-
                                        return output;
                                }
                        });
@@ -120,7 +125,7 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                int counter2 = 0;
 
                for (int i = 0; i < numElements; i++) {
-                       if (i % 2 == 0) {
+                       if (MathUtils.murmurHash(i) % numKeys == 0) {
                                counter1 += i;
                                builder1.append(counter1 + "\n");
                        } else {
@@ -196,7 +201,7 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                @Override
                public void run(SourceContext<Tuple2<Integer, NonSerializable>> 
ctx) throws Exception {
                        for (int i = 0; i < numElements; i++) {
-                               ctx.collect(new Tuple2<Integer, 
NonSerializable>(i, new NonSerializable(i)));
+                               ctx.collect(new Tuple2<>(i, new 
NonSerializable(i)));
                        }
                }
 
@@ -217,14 +222,13 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                @Override
                public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
                        for (int i = 0; i < numElements; i++) {
-                               Tuple2<Integer, Integer> result = new 
Tuple2<>(i % numKeys, i);
+                               Tuple2<Integer, Integer> result = new 
Tuple2<>(MathUtils.murmurHash(i) % numKeys, i);
                                ctx.collect(result);
                        }
                }
 
                @Override
                public void cancel() {
-
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b862fd0b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index fe49b1f..e573fe0 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -58,9 +58,11 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
     * The stream is grouped by the first field. For each group, the resulting 
stream is folded by
     * summing up the second tuple field.
     *
+    * This test relies on the hash function used by the {@link 
DataStream#keyBy}, which is
+    * assumed to be {@link MathUtils#murmurHash}.
     */
   @Test
-  def testFoldOperator(): Unit = {
+  def testGroupedFoldOperator(): Unit = {
     val numElements = 10
     val numKeys = 2
 

Reply via email to