[ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270116#comment-16270116 ]
ASF GitHub Bot commented on FLINK-8158: --------------------------------------- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5094 Hi @hequn8128, let me try to explain this. 1. In current implementation, the join process just relies on the cached rows instead of the watermarks. Specifically, when receiving a record, the join function will only check whether there exist qualified rows of the opposite cache in spite of the lateness. Thus if the cache ***is not cleaned up in time***, the outdated results will be emitted. 2. Strictly speaking, the value for holding back watermarks should be dynamically reported by the join function in runtime. The current implementation temporarily uses a static value (`MaxOutputDelay`) for that. In other words, the holding back value should be decided by the cached rows, rather than, the cache size should be decided by `MaxOutputDelay`. Hope that helps. Best, Xingcan > Rowtime window inner join emits late data > ----------------------------------------- > > Key: FLINK-8158 > URL: https://issues.apache.org/jira/browse/FLINK-8158 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Hequn Cheng > Assignee: Hequn Cheng > Attachments: screenshot-1xxx.png > > > When executing the join, the join operator needs to make sure that no late > data is emitted. Currently, this achieved by holding back watermarks. > However, the window border is not handled correctly. For the sql bellow: > {quote} > val sqlQuery = > """ > SELECT t2.key, t2.id, t1.id > FROM T1 as t1 join T2 as t2 ON > t1.key = t2.key AND > t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND > t2.rt + INTERVAL '1' SECOND > """.stripMargin > val data1 = new mutable.MutableList[(String, String, Long)] > // for boundary test > data1.+=(("A", "LEFT1", 6000L)) > val data2 = new mutable.MutableList[(String, String, Long)] > data2.+=(("A", "RIGHT1", 6000L)) > {quote} > Join will output a watermark with timestamp 1000, but if left comes with > another data ("A", "LEFT1", 1000L), join will output a record with timestamp > 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)