Repository: flink Updated Branches: refs/heads/master 55010d0bd -> 4651a1690
[FLINK-6552] Allow differing types for side outputs Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4651a169 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4651a169 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4651a169 Branch: refs/heads/master Commit: 4651a1690ac8d5784071eae1fad8ce179385cdaa Parents: 55010d0 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri May 12 14:40:44 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon May 15 12:50:29 2017 -0400 ---------------------------------------------------------------------- .../flink/streaming/api/graph/StreamGraph.java | 7 +-- .../streaming/runtime/SideOutputITCase.java | 55 ++++++++++++++++++-- .../runtime/util/TestListResultSink.java | 6 ++- 3 files changed, 60 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 5dd651c..2784517 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -318,9 +318,10 @@ public class StreamGraph extends StreamingPlan { continue; } - if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { - throw new IllegalArgumentException("Trying to add a side input for the same id " + - "with a different type. This is not allowed."); + if (tag.f1.getId().equals(outputTag.getId()) && + !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { + throw new IllegalArgumentException("Trying to add a side output for the same" + + "side-output id with a different type. This is not allowed."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java index 27124cc..765eae5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java @@ -160,14 +160,25 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen env.execute(); assertEquals( - Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE), + Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", + "WM:0", "WM:0", "WM:0", + "WM:2", "WM:2", "WM:2" , + "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE), sideOutputResultSink1.getSortedResult()); assertEquals( - Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE), + Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5", + "WM:0", "WM:0", "WM:0", + "WM:2", "WM:2", "WM:2" , + "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE), sideOutputResultSink1.getSortedResult()); - assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE), resultSink.getSortedResult()); + assertEquals( + Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", + "WM:0", "WM:0", "WM:0", + "WM:2", "WM:2", "WM:2" , + "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE), + resultSink.getSortedResult()); } @Test @@ -242,6 +253,44 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen } @Test + public void testDifferentSideOutputTypes() throws Exception { + final OutputTag<String> sideOutputTag1 = new OutputTag<String>("string"){}; + final OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("int"){}; + + TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>(); + TestListResultSink<Integer> sideOutputResultSink2 = new TestListResultSink<>(); + TestListResultSink<Integer> resultSink = new TestListResultSink<>(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + env.setParallelism(3); + + DataStream<Integer> dataStream = env.fromCollection(elements); + + SingleOutputStreamOperator<Integer> passThroughtStream = dataStream + .process(new ProcessFunction<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Integer value, Context ctx, Collector<Integer> out) throws Exception { + out.collect(value); + ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value)); + ctx.output(sideOutputTag2, 13); + } + }); + + passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1); + passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2); + passThroughtStream.addSink(resultSink); + env.execute(); + + assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"), sideOutputResultSink1.getSortedResult()); + assertEquals(Arrays.asList(13, 13, 13, 13, 13), sideOutputResultSink2.getSortedResult()); + assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult()); + } + + @Test public void testSideOutputNameClash() throws Exception { final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side"){}; final OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("side"){}; http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java index 321d4c5..3fabb4b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java @@ -17,6 +17,8 @@ package org.apache.flink.test.streaming.runtime.util; +import java.util.Collections; +import java.util.Comparator; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -66,8 +68,8 @@ public class TestListResultSink<T> extends RichSinkFunction<T> { public List<T> getSortedResult() { synchronized (resultList()) { - TreeSet<T> treeSet = new TreeSet<T>(resultList()); - ArrayList<T> sortedList = new ArrayList<T>(treeSet); + ArrayList<T> sortedList = new ArrayList<T>(resultList()); + Collections.sort((List) sortedList); return sortedList; } }