[
https://issues.apache.org/jira/browse/FLINK-39481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18093085#comment-18093085
]
Martijn Visser commented on FLINK-39481:
----------------------------------------
I managed to reproduce this locally and root-cause it. Summary: this is not a
test issue and not a checkpoint-restore issue. The trailing window family is
lost because, with interruptible timers enabled, the MAX_WATERMARK forwarding
to downstream operators can be dropped at end of input. The checkpoint+restore
in the test is incidental: it only makes the end of the run slow and
checkpoint-heavy, which is the actual trigger window.
*Mechanism*
1. With
{{execution.checkpointing.unaligned.interruptible-timers.enabled=true}},
watermark processing goes through
{{MailboxWatermarkProcessor.emitWatermarkInsideMailbox}}. The watermark is
forwarded downstream only after the timer-firing chain completes. If firing is
interrupted (e.g. by a checkpoint mail, which with unaligned checkpoints and
{{aligned-checkpoint-timeout=0}} happens constantly), the continuation is
scheduled as a *deferrable* mail.
2. Deferrable mails are skipped by {{yield()}}/{{tryYield()}} by design.
{{StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator}} drains the
mailbox at end of input with {{while (mailboxExecutor.tryYield()) {}}}. So when
EndOfData arrives while a watermark continuation is pending, the operators
finish and EndOfData is forwarded downstream without the watermark ever being
emitted.
3. The deferred continuation then runs in the resumed mailbox loop in
{{StreamTask.afterInvoke}}: it fires the remaining timers and emits the
watermark through the already-finished chain, behind EndOfData. Downstream has
already finished its operators at the old watermark, so windows that only fire
on MAX_WATERMARK (the trailing cumulate/hop family in this test) never fire,
and their buffered state is silently discarded. That is the exactly-once
violation.
This requires two chained window aggregations across an exchange (which is why
only the SplitDistinct=true and cascading-window variants fail), interruptible
timers plus frequent checkpoint mails (the invariant config in all failing
runs), and slow timer firing so an interruption is pending exactly when
EndOfData arrives (loaded CI agents; ROCKSDB makes fires slower, hence the
ROCKSDB bias).
*Evidence*
* Config forensics over all failing builds linked here and in FLINK-39930 (29
builds, ~1500 parsed runs of this test class): all 54 failing instances have
{{unaligned.enabled=true}} + {{aligned-checkpoint-timeout=PT0S}} +
{{interruptible-timers.enabled=true}} with [SplitDistinct=true, ROCKSDB]. Given
that config the failure rate is 54/416 = 13%; the flakiness only looks rare
because the per-test config randomization rarely rolls this combination.
* Local reproducer (scaled variant of testCumulateWindow_Rollup: same query
shape, 700 generated rows over 200 keys so slice boundaries fire hundreds of
timers, throttled sink for backpressure, trigger config forced): fails in ~1 of
5 runs on a laptop, losing the entire trailing [30s,*) family, identical to the
CI signature. Crucially it loses the rows in a run *without* any failure or
restore.
* Trace from a failing run (diagnostic logging on the watermark path): all 4
upstream GlobalWindowAggregate subtasks complete their trailing fires; 2 of
them defer watermark forwarding (interrupted), EndOfData overtakes, and their
MAX_WATERMARK is emitted only after finishOperators (t=16406/16407, source
FINISHED at 16408). The 4 downstream GlobalWindowAggregate subtasks never
advance past watermark 33000, never fire the trailing windows, and finish.
* Single-knob causal flips on the reproducer (baseline: 6 of 7 runs of 5
repetitions fail):
** {{interruptible-timers.enabled=false}}, everything else identical: 0
failures in 15 repetitions, and the interruption diagnostic fires 0 times.
** {{unaligned.enabled=false}} (aligned checkpoints): 0 failures in 15
repetitions.
** {{during-recovery.enabled=true}}: still fails with the identical signature,
so that option is unrelated (the CI config randomization never produced the
trigger combination together with during-recovery=true, which is why it could
not be ruled out from CI data).
*Reproducer*
Branch: [https://github.com/MartijnVisser/flink/tree/FLINK-39481-repro]
(current master + diagnostics + reproducer, commit e8925b7ffc9; do not merge,
diagnostic logging included). Run:
{code} export
F39481_FORCE="execution.checkpointing.unaligned.enabled=true;execution.checkpointing.aligned-checkpoint-timeout=0s;execution.checkpointing.unaligned.interruptible-timers.enabled=true"
mvn install -DskipTests -Dfast -Pskip-webui-build -pl
flink-table/flink-table-planner -am mvn surefire:test@integration-tests -pl
flink-table/flink-table-planner -Dtest=WindowDistinctAggregateScaledReproITCase
{code}
Fails in roughly 1 of 5 repetitions on my laptop (the class runs 5 repetitions
per invocation; in my runs 6 of 7 invocations captured at least one failure).
The assertion diff shows the golden run (no failure/restore) missing the whole
trailing family; grep the surefire output for {{FLINK39481}} to see the
watermark DEFER/EMIT trace and {{tryAdvanceWatermark INTERRUPTED}} events.
{{F39481_OPEN_DELAY_MS}} / {{F39481_ROW_DELAY_MS}} tune the throttle (defaults
300/5; the scaled test uses row delay 1 in my runs).
*Relation to other issues*
* FLINK-39930 is the same bug (same test class, same signature).
* FLINK-36663 / FLINK-38055 are about watermark initialization after restore;
this bug is about watermark forwarding at end of input. Related area but a
different defect; the restore path was exonerated by a dedicated
operator-harness test.
*Fix direction (to discuss)*
The end-of-input path must complete pending interruptible watermark processing
before operators finish and EndOfData is forwarded.
Options:
1. In the operator finish path, flush the pending {{MailboxWatermarkProcessor}}
work non-interruptibly (e.g. complete {{tryAdvanceWatermark}} with a never-stop
predicate and emit the watermark) before {{finish()}}.
2. Make the finish drain execute deferrable mails (changes tryYield semantics,
riskier).
3. Emit the watermark downstream eagerly when deferring at end of input (would
reorder watermark ahead of un-fired timer output, likely incorrect for other
reasons).
[~fanrui] since you are looking at this too, does the above match what you are
seeing?
> [tests] Fix flaky
> WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets
> ----------------------------------------------------------------------------------
>
> Key: FLINK-39481
> URL: https://issues.apache.org/jira/browse/FLINK-39481
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: featzhang
> Priority: Critical
> Labels: pull-request-available, test-stability
> Attachments:
> WindowDistinctAggregateITCase.testCumulateWindow_Rollup_SplitDistinct-true_ROCKSDB.FAILED.log
>
>
> The test `WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets` is
> flaky
> and fails intermittently in CI due to a race condition in the test framework
> itself.
> 1. Failure Evidence
> Azure Build #74194 (2026-04-17):
> - Run 1: PASS
> - Run 2: PASS
> - Run 3: FAIL ← flaky
> - Run 4: PASS
> Error message:
> org.opentest4j.AssertionFailedError:
> Expected 23 rows but got 14 rows.
> Missing rows (all related to window [2020-10-10T00:00:30, ...]):
> - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1
> - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1
> - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1
> - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0
> - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0
> - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0
> - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1
> - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1
> - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1
> 2. Root Cause Analysis
> The test uses `FailingCollectionSource` with `'failing-source' = 'true'`,
> which intentionally throws an artificial exception to trigger
> checkpoint-restore path.
> The source emits 11 rows with an event at timestamp `2020-10-10 00:00:32` and
> `2020-10-10 00:00:34` at the end of the dataset.
> The flakiness stems from a race condition in `FailingCollectionSource`:
> 1. The source uses a checkpoint-triggered failure mechanism. When the failure
> occurs, only the rows emitted BEFORE the checkpoint are guaranteed to be
> replayed after restore.
> 2. The last 2 rows (timestamps 00:00:32 and 00:00:34) advance the watermark
> past
> `00:00:31`, triggering the cumulate windows ending at 00:00:35/40/45.
> If the failure and restore happen AFTER the source emits rows
> at 00:00:32/00:00:34 but BEFORE the downstream operators flush their state,
> those late-window results can be lost.
> 3. Specifically, the issue is that `FailingCollectionSource` uses a legacy
> `SourceFunction` API with `ctx.getCheckpointLock()`. The timing of when
> `numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1`
> is satisfied relative to when the last 2 rows are emitted is
> non-deterministic,
> meaning sometimes the last 2 rows are emitted before the artificial failure
> is
> triggered (correct case), and sometimes after (data loss case).
>
> 3. Impact
> - Flink CI blocked intermittently on the `test_ci table` job
> - 3 out of 4 retry runs pass, demonstrating it's a classic flaky test pattern
> - Affects: WindowDistinctAggregateITCase (CUMULATE window with GROUPING
> SETS/CUBE/ROLLUP)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)