[ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272204#comment-16272204
 ] 

ASF GitHub Bot commented on FLINK-8158:
---------------------------------------

Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/5094
  
    Thanks for your replies.
    I agree with that it is valid to join late input data.What I concern is the 
watermark has not been hold back correctly.
    
    Take `testRowTimeJoinWithCommonBounds2` as an example.
    1. Output watermark with timestamp 1000 (hold back with 5000 from 6000)
    2. Output record1 with timestamp 6000 (valid)
    3. Output record2 with timestamp 1000 (invalid)
    
    The record2 is outputted invalidly because it's timestamp is equal to the 
previous output watermark (1000). In my pr, I hold the input watermark back 
with 5001 to make record2 valid. So the test will output with:
    
    1. Output watermark with timestamp 999 (hold back with 5001 from 6000)
    2. Output record1 with timestamp 6000 (valid)
    3. Output record2 with timestamp 1000 (valid) 


> Rowtime window inner join emits late data
> -----------------------------------------
>
>                 Key: FLINK-8158
>                 URL: https://issues.apache.org/jira/browse/FLINK-8158
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>         Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
>     val sqlQuery =
>       """
>         SELECT t2.key, t2.id, t1.id
>         FROM T1 as t1 join T2 as t2 ON
>           t1.key = t2.key AND
>           t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
>             t2.rt + INTERVAL '1' SECOND
>         """.stripMargin
>     val data1 = new mutable.MutableList[(String, String, Long)]
>     // for boundary test
>     data1.+=(("A", "LEFT1", 6000L))
>     val data2 = new mutable.MutableList[(String, String, Long)]
>     data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to