[
https://issues.apache.org/jira/browse/FLINK-17578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103074#comment-17103074
]
Danish Amjad commented on FLINK-17578:
--------------------------------------
The *root cause* of this problem lies in the implementation of
[StreamEdge|[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java]].
The hashcode and equals only check *edgeId* for comparison but they don't check
*outputTag.*
The *edgeId* is formed by concating information such as sourceVertex,
targetVertex etc. This information is the same for both of both even and odd
side outputs in Union operation.
This has an effect in *OperatorChain* construction where a *Map* is maintained
for edge/writer pair. The first one is always overwritten by the second. Later
this Map is used for construction of *allOutputs* list that contains the same
writer twice which causes the same list to be printed twice.
> Union of 2 SideOutputs behaviour incorrect
> ------------------------------------------
>
> Key: FLINK-17578
> URL: https://issues.apache.org/jira/browse/FLINK-17578
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.10.0
> Reporter: Tom Wells
> Priority: Major
>
> Strange behaviour when using union() to merge outputs of 2 DataStreams, where
> both are sourced from SideOutputs.
> See example code with comments demonstrating the issue:
> {code:java}
> def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
> val input = env.fromElements(1, 2, 3, 4)
> val oddTag = OutputTag[Int]("odds")
> val evenTag = OutputTag[Int]("even")
> val all =
> input.process {
> (value: Int, ctx: ProcessFunction[Int, Int]#Context, out:
> Collector[Int]) => {
> if (value % 2 != 0)
> ctx.output(oddTag, value)
> else
> ctx.output(evenTag, value)
> }
> }
> val odds = all.getSideOutput(oddTag)
> val evens = all.getSideOutput(evenTag)
> // These print correctly
> //
> odds.print // -> 1, 3
> evens.print // -> 2, 4
> // This prints incorrectly - BUG?
> //
> odds.union(evens).print // -> 2, 2, 4, 4
> evens.union(odds).print // -> 1, 1, 3, 3
> // Another test to understand normal behaviour of .union, using normal
> inputs
> //
> val odds1 = env.fromElements(1, 3)
> val evens1 = env.fromElements(2, 4)
> // Union of 2 normal inputs
> //
> odds1.union(evens1).print // -> 1, 2, 3, 4
> // Union of a normal input plus an input from a sideoutput
> //
> odds.union(evens1).print // -> 1, 2, 3, 4
> evens1.union(odds).print // -> 1, 2, 3, 4
> //
> // So it seems that when both inputs are from sideoutputs that it behaves
> incorrectly... BUG?
> env.execute("Test job")
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)