Repository: flink
Updated Branches:
  refs/heads/release-0.8 6afa4ba6e -> e01712111


[streaming] Small backport fixes for streaming to branch-0.8

Fix for operators inheriting lower parallelism e01057e
Hash join for streaming joins 6d49d1d


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0171211
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0171211
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0171211

Branch: refs/heads/release-0.8
Commit: e01712111d6f3e85c986d59d31d6f59030c0faa3
Parents: 6afa4ba
Author: mbalassi <[email protected]>
Authored: Mon Feb 9 15:59:03 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Feb 9 22:54:53 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  4 +-
 .../api/function/co/JoinWindowFunction.java     | 39 +++++++++++++++-----
 .../streaming/api/WindowCrossJoinTest.java      | 14 +++----
 3 files changed, 39 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8e21218..261f0ae 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1022,7 +1022,7 @@ public class DataStream<OUT> {
                                operatorName, outTypeInfo);
 
                jobGraphBuilder.addStreamVertex(returnStream.getId(), 
invokable, getType(), outTypeInfo,
-                               operatorName, degreeOfParallelism);
+                               operatorName, returnStream.getParallelism());
 
                connectGraph(inputStream, returnStream.getId(), 0);
 
@@ -1086,7 +1086,7 @@ public class DataStream<OUT> {
                DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", getType());
 
                jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SinkInvokable<OUT>(
-                               clean(sinkFunction)), getType(), null, "sink", 
degreeOfParallelism);
+                               clean(sinkFunction)), getType(), null, "sink", 
returnStream.getParallelism());
 
                this.connectGraph(this.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
index 9b39f33..e8f46dd 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.streaming.api.function.co;
 
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -32,24 +35,42 @@ public class JoinWindowFunction<IN1, IN2, OUT> implements 
CoWindowFunction<IN1,
        private JoinFunction<IN1, IN2, OUT> joinFunction;
 
        public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, 
KeySelector<IN2, ?> keySelector2,
-                       JoinFunction<IN1, IN2, OUT> joinFunction) {
-               this.keySelector1 = keySelector1;
+                                                       JoinFunction<IN1, IN2, 
OUT> joinFunction) { this.keySelector1 = keySelector1;
                this.keySelector2 = keySelector2;
                this.joinFunction = joinFunction;
        }
 
        @Override
        public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> 
out) throws Exception {
-               for (IN1 item1 : first) {
-                       Object key1 = keySelector1.getKey(item1);
 
-                       for (IN2 item2 : second) {
-                               Object key2 = keySelector2.getKey(item2);
+               Map<Object, List<IN1>> map = build(first);
 
-                               if (key1.equals(key2)) {
-                                       out.collect(joinFunction.join(item1, 
item2));
+               for (IN2 record : second) {
+                       Object key = keySelector2.getKey(record);
+                       List<IN1> match = map.get(key);
+                       if (match != null) {
+                               for (IN1 matching : match) {
+                                       out.collect(joinFunction.join(matching, 
record));
                                }
                        }
                }
+
+       }
+
+       private Map<Object, List<IN1>> build(List<IN1> records) throws 
Exception {
+
+               Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();
+
+               for (IN1 record : records) {
+                       Object key = keySelector1.getKey(record);
+                       List<IN1> current = map.get(key);
+                       if (current == null) {
+                               current = new LinkedList<IN1>();
+                               map.put(key, current);
+                       }
+                       current.add(record);
+               }
+
+               return map;
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index b52418d..85bd8bb 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.HashSet;
 
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -38,19 +38,19 @@ public class WindowCrossJoinTest implements Serializable {
 
        private static final long MEMORYSIZE = 32;
 
-       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+       private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> 
joinResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
+       private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> 
joinExpectedResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
 
-       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, 
Integer>>();
+       private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> 
crossResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
+       private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> 
crossExpectedResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>();
 
        @Test
        public void test() throws Exception {
                LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
                env.setBufferTimeout(1);
 
-               ArrayList<Tuple2<Integer, String>> in1 = new 
ArrayList<Tuple2<Integer, String>>();
-               ArrayList<Tuple1<Integer>> in2 = new 
ArrayList<Tuple1<Integer>>();
+               HashSet<Tuple2<Integer, String>> in1 = new 
HashSet<Tuple2<Integer, String>>();
+               HashSet<Tuple1<Integer>> in2 = new HashSet<Tuple1<Integer>>();
 
                in1.add(new Tuple2<Integer, String>(10, "a"));
                in1.add(new Tuple2<Integer, String>(20, "b"));

Reply via email to