[ 
https://issues.apache.org/jira/browse/FLINK-39620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

haiqingchen updated FLINK-39620:
--------------------------------
    Description: 
After a task restart / checkpoint recovery, a CUMULATE window with local-global 
aggregation may re-emit already-emitted historical windows (a.k.a. "backfill"). 
Root cause is a dual watermark inconsistency during the recovery warm-up 
window: SlicingWindowOperator.currentWatermark is restored from state, but 
InternalTimerService.currentWatermark is still Long.MIN_VALUE until the first 
post-recovery watermark arrives. During this warm-up gap, GlobalAggCombiner 
uses the stale timer-service watermark to decide whether to register a window 
timer, incorrectly treats historical windows as "not fired", re-registers old 
timers, and once the real watermark advances those timers fire immediately and 
cause a chain of fireWindow -> nextTriggerWindow for cumulative windows.

*Steps to Reproduce*
Run a CUMULATE window aggregation with local-global aggregate, e.g.:

{code:java}
SELECT window_start, window_end, COUNT(*)
FROM TABLE(
  CUMULATE(TABLE T, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES, INTERVAL '1' 
DAY))
GROUP BY window_start, window_end;
{code}

Let some cumulative windows fire normally.
Take a savepoint / trigger a task restart.
After recovery, before the first watermark arrives, feed a late record that 
belongs to an already-fired slice but within the max window (late-but-keep 
path).
Then push a watermark forward.
An old window [window_start, oldEnd) is emitted again after recovery.
Multiple subsequent cumulative windows may also be re-emitted due to 
nextTriggerWindow chaining.

*Why it was not caught by FLINK-24501*
FLINK-24501 (and PR #17509) added SlicingWindowOperator-level watermark 
persistence and is-late protection for processElement. However the 
timer-registration path inside GlobalAggCombiner still reads the timer-service 
watermark, which is not covered by that fix during the warm-up window.


  was:After a task restart / checkpoint recovery, a CUMULATE window with 
local-global aggregation may re-emit already-emitted historical windows (a.k.a. 
"backfill"). Root cause is a dual watermark inconsistency during the recovery 
warm-up window: SlicingWindowOperator.currentWatermark is restored from state, 
but InternalTimerService.currentWatermark is still Long.MIN_VALUE until the 
first post-recovery watermark arrives. During this warm-up gap, 
GlobalAggCombiner uses the stale timer-service watermark to decide whether to 
register a window timer, incorrectly treats historical windows as "not fired", 
re-registers old timers, and once the real watermark advances those timers fire 
immediately and cause a chain of fireWindow -> nextTriggerWindow for cumulative 
windows.


> Cumulative window re-emits old windows after recovery due to stale 
> InternalTimerService.currentWatermark in GlobalAggCombiner
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39620
>                 URL: https://issues.apache.org/jira/browse/FLINK-39620
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.1.1, 1.20.4
>         Environment: Flink 1.18.x (reproduced locally); 
> logic is the same in 1.20.x and master.
> SQL pattern: CUMULATE with local-global two-phase aggregation.
> Event-time with shiftTimeZone = Asia/Shanghai.
>            Reporter: haiqingchen
>            Priority: Critical
>
> After a task restart / checkpoint recovery, a CUMULATE window with 
> local-global aggregation may re-emit already-emitted historical windows 
> (a.k.a. "backfill"). Root cause is a dual watermark inconsistency during the 
> recovery warm-up window: SlicingWindowOperator.currentWatermark is restored 
> from state, but InternalTimerService.currentWatermark is still Long.MIN_VALUE 
> until the first post-recovery watermark arrives. During this warm-up gap, 
> GlobalAggCombiner uses the stale timer-service watermark to decide whether to 
> register a window timer, incorrectly treats historical windows as "not 
> fired", re-registers old timers, and once the real watermark advances those 
> timers fire immediately and cause a chain of fireWindow -> nextTriggerWindow 
> for cumulative windows.
> *Steps to Reproduce*
> Run a CUMULATE window aggregation with local-global aggregate, e.g.:
> {code:java}
> SELECT window_start, window_end, COUNT(*)
> FROM TABLE(
>   CUMULATE(TABLE T, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES, INTERVAL '1' 
> DAY))
> GROUP BY window_start, window_end;
> {code}
> Let some cumulative windows fire normally.
> Take a savepoint / trigger a task restart.
> After recovery, before the first watermark arrives, feed a late record that 
> belongs to an already-fired slice but within the max window (late-but-keep 
> path).
> Then push a watermark forward.
> An old window [window_start, oldEnd) is emitted again after recovery.
> Multiple subsequent cumulative windows may also be re-emitted due to 
> nextTriggerWindow chaining.
> *Why it was not caught by FLINK-24501*
> FLINK-24501 (and PR #17509) added SlicingWindowOperator-level watermark 
> persistence and is-late protection for processElement. However the 
> timer-registration path inside GlobalAggCombiner still reads the 
> timer-service watermark, which is not covered by that fix during the warm-up 
> window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to