[
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270924#comment-16270924
]
ASF GitHub Bot commented on FLINK-8158:
---------------------------------------
Github user hequn8128 commented on the issue:
https://github.com/apache/flink/pull/5094
Thanks for your replies.
@fhueske : The watermark must be aligned with timestamps, and it is the
main reason why watermarks are hold back(right?). Current window join may
output a record with timestamp equals to the earlier **output** watermark(see
`testRowTimeJoinWithCommonBounds2 ` in `JoinHarnessTest `).
@xccui : I am considering that if we can cache more late records to have a
more complete join result, and this can be achieved by caching both left and
right data that is later than the holding back watermark.
> Rowtime window inner join emits late data
> -----------------------------------------
>
> Key: FLINK-8158
> URL: https://issues.apache.org/jira/browse/FLINK-8158
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Hequn Cheng
> Assignee: Hequn Cheng
> Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late
> data is emitted. Currently, this achieved by holding back watermarks.
> However, the window border is not handled correctly. For the sql bellow:
> {quote}
> val sqlQuery =
> """
> SELECT t2.key, t2.id, t1.id
> FROM T1 as t1 join T2 as t2 ON
> t1.key = t2.key AND
> t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
> t2.rt + INTERVAL '1' SECOND
> """.stripMargin
> val data1 = new mutable.MutableList[(String, String, Long)]
> // for boundary test
> data1.+=(("A", "LEFT1", 6000L))
> val data2 = new mutable.MutableList[(String, String, Long)]
> data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp
> 1000 which equals previous watermark.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)