[
https://issues.apache.org/jira/browse/FLINK-39620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39620:
-----------------------------------
Labels: pull-request-available (was: )
> Cumulative window re-emits old windows after recovery due to stale
> InternalTimerService.currentWatermark in GlobalAggCombiner
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39620
> URL: https://issues.apache.org/jira/browse/FLINK-39620
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.1.1, 1.20.4
> Environment: Flink 1.18.x (reproduced locally);
> logic is the same in 1.20.x and master.
> SQL pattern: CUMULATE with local-global two-phase aggregation.
> Event-time with shiftTimeZone = Asia/Shanghai.
> Reporter: haiqingchen
> Priority: Critical
> Labels: pull-request-available
>
> After a task restart / checkpoint recovery, a CUMULATE window with
> local-global aggregation may re-emit already-emitted historical windows
> (a.k.a. "backfill"). Root cause is a dual watermark inconsistency during the
> recovery warm-up window: SlicingWindowOperator.currentWatermark is restored
> from state, but InternalTimerService.currentWatermark is still Long.MIN_VALUE
> until the first post-recovery watermark arrives. During this warm-up gap,
> GlobalAggCombiner uses the stale timer-service watermark to decide whether to
> register a window timer, incorrectly treats historical windows as "not
> fired", re-registers old timers, and once the real watermark advances those
> timers fire immediately and cause a chain of fireWindow -> nextTriggerWindow
> for cumulative windows.
> *Steps to Reproduce*
> Run a CUMULATE window aggregation with local-global aggregate, e.g.:
> {code:java}
> SELECT window_start, window_end, COUNT(*)
> FROM TABLE(
> CUMULATE(TABLE T, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES, INTERVAL '1'
> DAY))
> GROUP BY window_start, window_end;
> {code}
> Let some cumulative windows fire normally.
> Take a savepoint / trigger a task restart.
> After recovery, before the first watermark arrives, feed a late record that
> belongs to an already-fired slice but within the max window (late-but-keep
> path).
> Then push a watermark forward.
> An old window [window_start, oldEnd) is emitted again after recovery.
> Multiple subsequent cumulative windows may also be re-emitted due to
> nextTriggerWindow chaining.
> *Why it was not caught by FLINK-24501*
> FLINK-24501 (and PR #17509) added SlicingWindowOperator-level watermark
> persistence and is-late protection for processElement. However the
> timer-registration path inside GlobalAggCombiner still reads the
> timer-service watermark, which is not covered by that fix during the warm-up
> window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)