[ 
https://issues.apache.org/jira/browse/FLINK-25199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479244#comment-17479244
 ] 

Piotr Nowojski commented on FLINK-25199:
----------------------------------------

The problem was quite strange. If there was a node that was self-unioned with 
itself, it was creating a situation with two identical StreamEdges. Both with 
the same partitioning, from the same source node to the same target node. 

This was causing issues when constructing output collectors and picking the 
correct RecordWriters, as StreamTask was not able to uniquely identify given 
StreamEdge and was assigning the same RecordWriter to both of the edges. As a 
result all stream elements
were sent twice through the same RecordWriter. It was actually pretty harmless 
apart of calculating the combined watermark downstream, since all watermarks 
were always comming just from one single edge/inputgate, and the unused edges 
were always stuck with min watermark.

As a solution we are making sure that StreamEdges are unique by introducing a 
uniqueId field, incremented for every pair of StreamEdges connecting the same 
node.

> StreamEdges are not unique in self-union blocking propagation of watermarks
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-25199
>                 URL: https://issues.apache.org/jira/browse/FLINK-25199
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.14.3
>            Reporter: Timo Walther
>            Assignee: Piotr Nowojski
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> It seems {{fromValues}} that generates multiple rows does not emit any 
> watermarks:
> {code}
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>         Table inputTable =
>                 tEnv.fromValues(
>                         DataTypes.ROW(
>                                 DataTypes.FIELD("weight", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f0", DataTypes.STRING()),
>                                 DataTypes.FIELD("f1", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f2", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f3", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f4", DataTypes.INT()),
>                                 DataTypes.FIELD("label", DataTypes.STRING())),
>                         Row.of(1., "a", 1., 1., 1., 2, "l1"),
>                         Row.of(1., "a", 1., 1., 1., 2, "l1"));
>         DataStream<Row> input = tEnv.toDataStream(inputTable);
> {code}
> {{fromValues(1, 2, 3)}} or {{fromValues}} with only 1 row works correctly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to