[ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270090#comment-16270090
 ] 

ASF GitHub Bot commented on FLINK-8158:
---------------------------------------

Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/5094
  
    Hi @xccui , thanks for your reply. Feel free to take it if you wish. I 
still have some confusions. 
    1. Considering the test `testRowTimeJoinWithCommonBounds2` in 
`JoinHarnessTest`, do you mean the row with timestamp 1000 should not been 
calculated? The row does satisfy the join condition: `t1.rt BETWEEN t2.rt - 
INTERVAL '5' SECOND AND t2.rt + INTERVAL '1' SECOND`, and this is the 
difference between `BETWEEN` and `NOT BETWEEN`.
    2. Can't we use the holding back watermark as the boundary to cache and 
expire data? Any data with timestamp bigger than the holding back watermark 
should be cached and may be joined later.  We should take any opportunity to 
join and produce a result that satisfy the join predicate.


> Rowtime window inner join emits late data
> -----------------------------------------
>
>                 Key: FLINK-8158
>                 URL: https://issues.apache.org/jira/browse/FLINK-8158
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>         Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
>     val sqlQuery =
>       """
>         SELECT t2.key, t2.id, t1.id
>         FROM T1 as t1 join T2 as t2 ON
>           t1.key = t2.key AND
>           t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
>             t2.rt + INTERVAL '1' SECOND
>         """.stripMargin
>     val data1 = new mutable.MutableList[(String, String, Long)]
>     // for boundary test
>     data1.+=(("A", "LEFT1", 6000L))
>     val data2 = new mutable.MutableList[(String, String, Long)]
>     data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to