Repository: flink
Updated Branches:
  refs/heads/master 55010d0bd -> 4651a1690


[FLINK-6552] Allow differing types for side outputs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4651a169
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4651a169
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4651a169

Branch: refs/heads/master
Commit: 4651a1690ac8d5784071eae1fad8ce179385cdaa
Parents: 55010d0
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri May 12 14:40:44 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon May 15 12:50:29 2017 -0400

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamGraph.java  |  7 +--
 .../streaming/runtime/SideOutputITCase.java     | 55 ++++++++++++++++++--
 .../runtime/util/TestListResultSink.java        |  6 ++-
 3 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 5dd651c..2784517 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -318,9 +318,10 @@ public class StreamGraph extends StreamingPlan {
                                continue;
                        }
 
-                       if 
(!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
-                               throw new IllegalArgumentException("Trying to 
add a side input for the same id " +
-                                               "with a different type. This is 
not allowed.");
+                       if (tag.f1.getId().equals(outputTag.getId()) &&
+                                       
!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
+                               throw new IllegalArgumentException("Trying to 
add a side output for the same" +
+                                               "side-output id with a 
different type. This is not allowed.");
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 27124cc..765eae5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -160,14 +160,25 @@ public class SideOutputITCase extends 
StreamingMultipleProgramsTestBase implemen
                env.execute();
 
                assertEquals(
-                               Arrays.asList("E:sideout-1", "E:sideout-2", 
"E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:2", "WM:" + 
Long.MAX_VALUE),
+                               Arrays.asList("E:sideout-1", "E:sideout-2", 
"E:sideout-3", "E:sideout-4", "E:sideout-5",
+                                               "WM:0", "WM:0", "WM:0",
+                                               "WM:2", "WM:2", "WM:2" ,
+                                               "WM:" + Long.MAX_VALUE, "WM:" + 
Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
                                sideOutputResultSink1.getSortedResult());
 
                assertEquals(
-                               Arrays.asList("E:sideout-1", "E:sideout-2", 
"E:sideout-3", "E:sideout-4", "E:sideout-5", "WM:0", "WM:2", "WM:" + 
Long.MAX_VALUE),
+                               Arrays.asList("E:sideout-1", "E:sideout-2", 
"E:sideout-3", "E:sideout-4", "E:sideout-5",
+                                               "WM:0", "WM:0", "WM:0",
+                                               "WM:2", "WM:2", "WM:2" ,
+                                               "WM:" + Long.MAX_VALUE, "WM:" + 
Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
                                sideOutputResultSink1.getSortedResult());
 
-               assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", 
"WM:0", "WM:2", "WM:" + Long.MAX_VALUE), resultSink.getSortedResult());
+               assertEquals(
+                               Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5",
+                                               "WM:0", "WM:0", "WM:0",
+                                               "WM:2", "WM:2", "WM:2" ,
+                                               "WM:" + Long.MAX_VALUE, "WM:" + 
Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
+                               resultSink.getSortedResult());
        }
 
        @Test
@@ -242,6 +253,44 @@ public class SideOutputITCase extends 
StreamingMultipleProgramsTestBase implemen
        }
 
        @Test
+       public void testDifferentSideOutputTypes() throws Exception {
+               final OutputTag<String> sideOutputTag1 = new 
OutputTag<String>("string"){};
+               final OutputTag<Integer> sideOutputTag2 = new 
OutputTag<Integer>("int"){};
+
+               TestListResultSink<String> sideOutputResultSink1 = new 
TestListResultSink<>();
+               TestListResultSink<Integer> sideOutputResultSink2 = new 
TestListResultSink<>();
+               TestListResultSink<Integer> resultSink = new 
TestListResultSink<>();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+               env.setParallelism(3);
+
+               DataStream<Integer> dataStream = env.fromCollection(elements);
+
+               SingleOutputStreamOperator<Integer> passThroughtStream = 
dataStream
+                               .process(new ProcessFunction<Integer, 
Integer>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void processElement(
+                                                       Integer value, Context 
ctx, Collector<Integer> out) throws Exception {
+                                               out.collect(value);
+                                               ctx.output(sideOutputTag1, 
"sideout-" + String.valueOf(value));
+                                               ctx.output(sideOutputTag2, 13);
+                                       }
+                               });
+
+               
passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+               
passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+               passThroughtStream.addSink(resultSink);
+               env.execute();
+
+               assertEquals(Arrays.asList("sideout-1", "sideout-2", 
"sideout-3", "sideout-4", "sideout-5"), 
sideOutputResultSink1.getSortedResult());
+               assertEquals(Arrays.asList(13, 13, 13, 13, 13), 
sideOutputResultSink2.getSortedResult());
+               assertEquals(Arrays.asList(1, 2, 3, 4, 5), 
resultSink.getSortedResult());
+       }
+
+       @Test
        public void testSideOutputNameClash() throws Exception {
                final OutputTag<String> sideOutputTag1 = new 
OutputTag<String>("side"){};
                final OutputTag<Integer> sideOutputTag2 = new 
OutputTag<Integer>("side"){};

http://git-wip-us.apache.org/repos/asf/flink/blob/4651a169/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
index 321d4c5..3fabb4b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.test.streaming.runtime.util;
 
+import java.util.Collections;
+import java.util.Comparator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
@@ -66,8 +68,8 @@ public class TestListResultSink<T> extends 
RichSinkFunction<T> {
 
        public List<T> getSortedResult() {
                synchronized (resultList()) {
-                       TreeSet<T> treeSet = new TreeSet<T>(resultList());
-                       ArrayList<T> sortedList = new ArrayList<T>(treeSet);
+                       ArrayList<T> sortedList = new 
ArrayList<T>(resultList());
+                       Collections.sort((List) sortedList);
                        return sortedList;
                }
        }

Reply via email to