[
https://issues.apache.org/jira/browse/FLINK-18960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17181004#comment-17181004
]
Aljoscha Krettek edited comment on FLINK-18960 at 8/24/20, 12:43 PM:
---------------------------------------------------------------------
hi [~xiaohang.li], I checked the issue, it does existed in old versions, but it
should be fixed in FLINK-17578, you may try to upgrade to 1.10.2 or 1.11.
was (Author: gaoyunhaii):
hi [~xiaohang.li], I checked the issue, it does existed in old versions, but it
should be fixed in https://issues.apache.org/jira/browse/FLINK-17578, you may
try to upgrade to 1.10.2 or 1.11.
> flink sideoutput union
> ----------------------
>
> Key: FLINK-18960
> URL: https://issues.apache.org/jira/browse/FLINK-18960
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.10.1
> Reporter: xiaohang.li
> Priority: Minor
>
> Flink sideoutput union seems not works right. If we union the sideoutput from
> the same operator, the output is the result of last side output times by the
> number of unions, which is not expected. For example,
> {code:java}
> val side = new OutputTag[String]("side")
> val side2 = new OutputTag[String]("side2")
> val side3 = new OutputTag[String]("side3")
> val ds = env.socketTextStream("master",9001)
> val res = ds.process(new ProcessFunction[String,String] {
> override def processElement(value: String, ctx: ProcessFunction[String,
> String]#Context, out: Collector[String]): Unit = {
> if(value.contains("hello"))
> { ctx.output(side,value) }
> else if(value.contains("world"))
> { ctx.output(side2,value) }
> else if(value.contains("flink"))
> { ctx.output(side3,value) }
> out.collect(value)
> }
> })
> val res1 = res.getSideOutput(side)
> val res2 = res.getSideOutput(side2)
> val res3 = res.getSideOutput(side3)
> println( "====>"+res1.getClass)
> println( "====>"+res2.getClass)
> res1.print("res1")
> res2.print("res2")
> res3.print("res3")
> res2.union(res1).union(res3).print("all")
> {code}
>
> If we input
> {code:java}
> hello
> world
> flink
> {code}
> The output will be
>
> {code:java}
> res1> hello
> res2> world
> res3> flink
> all> flink
> all> flink
> all> flink
> {code}
>
> But the expected output would be
> {code:java}
> res1> hello
> res2> world
> res3> flink
> all> hello
> all> world
> all> flink
> {code}
>
>
> if we add a _map_ after the sideoutput and then union them, the output would
> be right, but adding map should be not needed.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)