[ 
https://issues.apache.org/jira/browse/FLINK-6858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16039959#comment-16039959
 ] 

hongyuhong commented on FLINK-6858:
-----------------------------------

Hi [~fhueske], the implemention of unbounded event-time now is emit the record 
with it's real timestamp, but not  {{currentWatermark + 1}},
it will reset the timestamp before emit it.
{code}
val curTimestamp = sortedTimestamps.removeFirst()
val curRowList = rowMapState.get(curTimestamp)
collector.setAbsoluteTimestamp(curTimestamp)
{code}

> Unbounded event time Over Window emits incorrect timestamps
> -----------------------------------------------------------
>
>                 Key: FLINK-6858
>                 URL: https://issues.apache.org/jira/browse/FLINK-6858
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Fabian Hueske
>            Priority: Critical
>
> The unbounded event time OVER windows emit records with incorrect timestamps.
> OVER aggregates "enrich" each input row with aggregates computed over 
> neighboring rows, i.e., they produce one output row for each input row. The 
> (event-time) timestamp of each input row should be preserved and not modified.
> All OVER window aggregates are implemented using the {{ProcessFunction}} 
> interface. The interface has two methods {{processElement()}} and 
> {{onTimer()}} that can produce output records. Records emitted by 
> {{processElement()}} are emitted with the timestamp of the record that was 
> given as an argument to the method. Records emitted by {{onTimer()}} are 
> emitted with the timestamp of the timer that triggered the call of the method.
> The implementation of the unbounded event-time OVER window registers a new 
> new timer when {{processElement()}} is called for {{currentWatermark + 1}}. 
> When the timer triggers, the {{onTimer()}} processes all rows that where 
> received between this and the last {{onTimer()}} call with timestamps smaller 
> than the current watermark. However, this means that all emitted rows have a 
> timestamp of {{currentWatermark + 1}} which is not what we want.
> The bounded event-time OVER window operators follow a different strategy and 
> register a timer for the timestamp of each row that was processed by 
> {{processElement()}} and emit the corresponding rows when {{onTimer()}} is 
> called. Hence, they emit the rows with correct timestamps.
> I think we should change the implementation of the unbounded event-time OVER 
> aggregates to a similar strategy as the bounded event-time OVER aggregates.
> What do you think [~Yuhong_kyo] [~sunjincheng121]?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to