Repository: flink Updated Branches: refs/heads/release-0.8 6afa4ba6e -> e01712111
[streaming] Small backport fixes for streaming to branch-0.8 Fix for operators inheriting lower parallelism e01057e Hash join for streaming joins 6d49d1d Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0171211 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0171211 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0171211 Branch: refs/heads/release-0.8 Commit: e01712111d6f3e85c986d59d31d6f59030c0faa3 Parents: 6afa4ba Author: mbalassi <[email protected]> Authored: Mon Feb 9 15:59:03 2015 +0100 Committer: mbalassi <[email protected]> Committed: Mon Feb 9 22:54:53 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 4 +- .../api/function/co/JoinWindowFunction.java | 39 +++++++++++++++----- .../streaming/api/WindowCrossJoinTest.java | 14 +++---- 3 files changed, 39 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8e21218..261f0ae 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1022,7 +1022,7 @@ public class DataStream<OUT> { operatorName, outTypeInfo); jobGraphBuilder.addStreamVertex(returnStream.getId(), invokable, getType(), outTypeInfo, - operatorName, degreeOfParallelism); + operatorName, returnStream.getParallelism()); connectGraph(inputStream, returnStream.getId(), 0); @@ -1086,7 +1086,7 @@ public class DataStream<OUT> { DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType()); jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>( - clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism); + clean(sinkFunction)), getType(), null, "sink", returnStream.getParallelism()); this.connectGraph(this.copy(), returnStream.getId(), 0); http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java index 9b39f33..e8f46dd 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java @@ -18,7 +18,10 @@ package org.apache.flink.streaming.api.function.co; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; @@ -32,24 +35,42 @@ public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, private JoinFunction<IN1, IN2, OUT> joinFunction; public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2, - JoinFunction<IN1, IN2, OUT> joinFunction) { - this.keySelector1 = keySelector1; + JoinFunction<IN1, IN2, OUT> joinFunction) { this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; this.joinFunction = joinFunction; } @Override public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception { - for (IN1 item1 : first) { - Object key1 = keySelector1.getKey(item1); - for (IN2 item2 : second) { - Object key2 = keySelector2.getKey(item2); + Map<Object, List<IN1>> map = build(first); - if (key1.equals(key2)) { - out.collect(joinFunction.join(item1, item2)); + for (IN2 record : second) { + Object key = keySelector2.getKey(record); + List<IN1> match = map.get(key); + if (match != null) { + for (IN1 matching : match) { + out.collect(joinFunction.join(matching, record)); } } } + + } + + private Map<Object, List<IN1>> build(List<IN1> records) throws Exception { + + Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>(); + + for (IN1 record : records) { + Object key = keySelector1.getKey(record); + List<IN1> current = map.get(key); + if (current == null) { + current = new LinkedList<IN1>(); + map.put(key, current); + } + current.add(record); + } + + return map; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/e0171211/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index b52418d..85bd8bb 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api; import static org.junit.Assert.assertEquals; import java.io.Serializable; -import java.util.ArrayList; +import java.util.HashSet; import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.java.tuple.Tuple1; @@ -38,19 +38,19 @@ public class WindowCrossJoinTest implements Serializable { private static final long MEMORYSIZE = 32; - private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); - private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); + private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(); + private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(); - private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); - private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); + private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(); + private static HashSet<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(); @Test public void test() throws Exception { LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); env.setBufferTimeout(1); - ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>(); - ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>(); + HashSet<Tuple2<Integer, String>> in1 = new HashSet<Tuple2<Integer, String>>(); + HashSet<Tuple1<Integer>> in2 = new HashSet<Tuple1<Integer>>(); in1.add(new Tuple2<Integer, String>(10, "a")); in1.add(new Tuple2<Integer, String>(20, "b"));
