yanghua commented on a change in pull request #7470: [FLINK-11283] Accessing
the key when processing connected keyed stream
URL: https://github.com/apache/flink/pull/7470#discussion_r274748632
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
##########
@@ -591,6 +638,114 @@ public void processElement2(Integer value, Context ctx,
Collector<Integer> out)
assertEquals(Arrays.asList(1, 2, 3, 4, 5),
resultSink.getSortedResult());
}
+ /**
+ * Test keyed KeyedCoProcessFunction side output with multiple
consumers.
+ */
+ @Test
+ public void
testRealKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws
Exception {
+ final OutputTag<String> sideOutputTag1 = new
OutputTag<String>("side1"){};
+ final OutputTag<String> sideOutputTag2 = new
OutputTag<String>("side2"){};
+
+ TestListResultSink<String> sideOutputResultSink1 = new
TestListResultSink<>();
+ TestListResultSink<String> sideOutputResultSink2 = new
TestListResultSink<>();
+ TestListResultSink<Integer> resultSink = new
TestListResultSink<>();
+
+ StreamExecutionEnvironment see =
StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+
+ DataStream<Integer> ds1 = see.fromCollection(elements);
+ DataStream<Integer> ds2 = see.fromCollection(elements);
+
+ SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+ .keyBy(i -> i)
+ .connect(ds2.keyBy(i -> i))
+ .process(new KeyedCoProcessFunction<Integer, Integer,
Integer, Integer>() {
+ @Override
+ public void processElement1(Integer value,
Context ctx, Collector<Integer> out)
+ throws Exception {
+ if (value < 4) {
+ out.collect(value);
+ ctx.output(sideOutputTag1,
"sideout1-" + ctx.getCurrentKey() + "-" + String.valueOf(value));
+ }
+ }
+
+ @Override
+ public void processElement2(Integer value,
Context ctx, Collector<Integer> out)
+ throws Exception {
+ if (value >= 4) {
+ out.collect(value);
+ ctx.output(sideOutputTag2,
"sideout2-" + ctx.getCurrentKey() + "-" + String.valueOf(value));
+ }
+ }
+ });
+
+
passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+
passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+ passThroughtStream.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout1-1-1", "sideout1-2-2",
"sideout1-3-3"), sideOutputResultSink1.getSortedResult());
+ assertEquals(Arrays.asList("sideout2-4-4", "sideout2-5-5"),
sideOutputResultSink2.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5),
resultSink.getSortedResult());
+ }
+
+ /**
+ * Test keyed KeyedCoProcessFunction side output with multiple
consumers.
+ */
+ @Test
+ public void
testRealKeyedCoProcessFunctionSideOutputWithMultipleConsumersAndDifferentTypes()
throws Exception {
Review comment:
um... Yes, I tried tested a tuple input type and whether it can invoke
`keyBy`, `connect` or not.
OK, Let me remove this test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services