[ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270116#comment-16270116
 ] 

ASF GitHub Bot commented on FLINK-8158:
---------------------------------------

Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/5094
  
    Hi @hequn8128, let me try to explain this.
    
    1. In current implementation,  the join process just relies on the cached 
rows instead of the watermarks. Specifically, when receiving a record, the join 
function will only check whether there exist qualified rows of the opposite 
cache in spite of the lateness. Thus if the cache ***is not cleaned up in 
time***, the outdated results will be emitted.
    
    2. Strictly speaking, the value for holding back watermarks should be 
dynamically reported by the join function in runtime. The current 
implementation temporarily uses a static value (`MaxOutputDelay`) for that. In 
other words, the holding back value should be decided by the cached rows, 
rather than, the cache size should be decided by `MaxOutputDelay`.
    
    Hope that helps.
    
    Best, Xingcan


> Rowtime window inner join emits late data
> -----------------------------------------
>
>                 Key: FLINK-8158
>                 URL: https://issues.apache.org/jira/browse/FLINK-8158
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>         Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
>     val sqlQuery =
>       """
>         SELECT t2.key, t2.id, t1.id
>         FROM T1 as t1 join T2 as t2 ON
>           t1.key = t2.key AND
>           t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
>             t2.rt + INTERVAL '1' SECOND
>         """.stripMargin
>     val data1 = new mutable.MutableList[(String, String, Long)]
>     // for boundary test
>     data1.+=(("A", "LEFT1", 6000L))
>     val data2 = new mutable.MutableList[(String, String, Long)]
>     data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to