[
https://issues.apache.org/jira/browse/FLINK-6858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16039959#comment-16039959
]
hongyuhong commented on FLINK-6858:
-----------------------------------
Hi [~fhueske], the implemention of unbounded event-time now is emit the record
with it's real timestamp, but not {{currentWatermark + 1}},
it will reset the timestamp before emit it.
{code}
val curTimestamp = sortedTimestamps.removeFirst()
val curRowList = rowMapState.get(curTimestamp)
collector.setAbsoluteTimestamp(curTimestamp)
{code}
> Unbounded event time Over Window emits incorrect timestamps
> -----------------------------------------------------------
>
> Key: FLINK-6858
> URL: https://issues.apache.org/jira/browse/FLINK-6858
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: Fabian Hueske
> Priority: Critical
>
> The unbounded event time OVER windows emit records with incorrect timestamps.
> OVER aggregates "enrich" each input row with aggregates computed over
> neighboring rows, i.e., they produce one output row for each input row. The
> (event-time) timestamp of each input row should be preserved and not modified.
> All OVER window aggregates are implemented using the {{ProcessFunction}}
> interface. The interface has two methods {{processElement()}} and
> {{onTimer()}} that can produce output records. Records emitted by
> {{processElement()}} are emitted with the timestamp of the record that was
> given as an argument to the method. Records emitted by {{onTimer()}} are
> emitted with the timestamp of the timer that triggered the call of the method.
> The implementation of the unbounded event-time OVER window registers a new
> new timer when {{processElement()}} is called for {{currentWatermark + 1}}.
> When the timer triggers, the {{onTimer()}} processes all rows that where
> received between this and the last {{onTimer()}} call with timestamps smaller
> than the current watermark. However, this means that all emitted rows have a
> timestamp of {{currentWatermark + 1}} which is not what we want.
> The bounded event-time OVER window operators follow a different strategy and
> register a timer for the timestamp of each row that was processed by
> {{processElement()}} and emit the corresponding rows when {{onTimer()}} is
> called. Hence, they emit the rows with correct timestamps.
> I think we should change the implementation of the unbounded event-time OVER
> aggregates to a similar strategy as the bounded event-time OVER aggregates.
> What do you think [~Yuhong_kyo] [~sunjincheng121]?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)