[ 
https://issues.apache.org/jira/browse/FLINK-28719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573809#comment-17573809
 ] 

Chesnay Schepler commented on FLINK-28719:
------------------------------------------

So let's dive into what is happening. The source runs with a parallelism of 1, 
and let's say for simplicity that the map runs with p=7 (one subtask for each 
record).
The sink send one record to one map subtask, and then the watermark to all map 
subtasks. These just forward what they receive to the window operator.

We can thus visualize the inputs to the window function as a series of stacks, 
one for each map subtask.

(X is an element where the timestamp is X, WY is a watermark with timestamp Y).
||op1||op2||op3||op4||op5||op6||op7||
|W7|W7|W7|W7|W7|W7|W7|
| | | | | | |7|
|W4|W4|W4|W4|W4|W4|W4|
| | | | | |4| |
|W3|W3|W3|W3|W3|W3|W3|
| | | | |3| | |
|W6|W6|W6|W6|W6|W6|W6|
| | | |6| | | |
|W5|W5|W5|W5|W5|W5|W5|
| | |5| | | | |
|W2|W2|W2|W2|W2|W2|W2|
| |2| | | | | |
|W1|W1|W1|W1|W1|W1|W1|
|1| | | | | | |

Now consume records in an arbitrary order (i.e., pull data from the bottom of a 
stack until you reach the value). Whenever the watermarks from each input is 
greater or equal T, then the window considers T as the current time.

Let's say we consume the input of op1 and op2 completely, and from all other 
inputs consume all watermarks until we hit a record in each.

The inputs then only contain this:
||op1||op2||op3||op4||op5||op6||op7||
| | |W7|W7|W7|W7|W7|
| | | | | | |7|
| | |W4|W4|W4|W4| |
| | | | | |4| |
| | |W3|W3|W3| | |
| | | | |3| | |
| | |W6|W6| | | |
| | | |6| | | |
| | |W5| | | | |
| | |5| | | | |

Whereas the current watermark for each input are this:
|W7|W7|W2|W5|W6|W6|W6|

Since 2 is the current watermark we can fire the first window (from timestamp 
0-2).

This is the point where things get interesting. The next window to be fired 
ranges from 3-5.

If we now consume an element from op3, then we read the element 5 and the 
watermark 5.

Our watermarks would then look like this:
|W7|W7|W5|W5|W6|W6|W6|

So we fire the next window, only containing element 5.

However, let's revert back, and instead first read records from other streams, 
like element 4 and 3.
These still fit into the current window (3-5)
||op1||op2||op3||op4||op5||op6||op7||
| | |W7|W7| | |W7|
| | | | | | |7|
| | |W4|W4| | | |
| | | | | | | |
| | |W3|W3| | | |
| | | | | | | |
| | |W6|W6| | | |
| | | |6| | | |
| | |W5| | | | |
| | |5| | | | |
Now let's look at the watermarks:
|W7|W7|W2|W5|W7|W7|W6|

Since we haven't read from op3 the current time is still 2, so these elements 
are now not considered late.
Now we read 5, bumping the time to 5, firing the window, containing 3,4 and 5.

> Mapping a data source before window aggregation causes Flink to stop handle 
> late events correctly.
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28719
>                 URL: https://issues.apache.org/jira/browse/FLINK-28719
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.1
>            Reporter: Mykyta Mykhailenko
>            Priority: Major
>
> I have created a 
> [repository|https://github.com/mykytamykhailenko/flink-map-with-issue] where 
> I describe this issue in detail. 
> I have provided a few tests and source code so that you can reproduce the 
> issue on your own machine. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to