[ 
https://issues.apache.org/jira/browse/FLINK-38614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18089771#comment-18089771
 ] 

Martijn Visser commented on FLINK-38614:
----------------------------------------

I've done a lot of (AI) investigation with this one, and I think this is 
actually an exactly-once data-loss bug, not a flaky test.

Root cause:

SinkV2 jobs with a post-commit (global) committer can lose the final 
checkpoint's committables under unaligned checkpoints:

* The {{CommitterOperator}} forwards the final checkpoint's committables to the 
downstream global committer only on {{notifyCheckpointComplete}}, which (per 
FLIP-147) runs after {{endInput}}.
* With unaligned checkpoints those committables can belong to the 
post-end-of-input final checkpoint.
* The task broadcasts {{EndOfData}} downstream right after {{endInput}} (before 
that final {{notifyCheckpointComplete}}), so the global committer finishes 
consuming its input before the committables arrive and silently drops them.
It is parallelism-agnostic (reproduced at 1->1, not only on rescale); unaligned 
checkpoints on the pipeline upstream of the sink are the trigger. With aligned 
checkpoints the last committables fall into a pre-end-of-input checkpoint and 
are delivered normally, which is why it only surfaces intermittently under the 
CI checkpoint randomization.

Proposed fix (I'll open a draft PR):

Defer the downstream {{EndOfData}} broadcast -- for tasks whose operator chain 
emits records on the final checkpoint -- until the final checkpoint completes, 
so the records are delivered before the downstream finishes. 

Because this touches the {{StreamTask}} end-of-input / final-checkpoint 
coordination (FLIP-147), this needs a review from someone with the specific 
expertise in this area. 

One other remark: there is a separate test-correctness issue with the same test 
(it asserts an exact commit multiplicity with a non-idempotent test committer, 
which over-specifies the {{Committer}} idempotency contract and produces 
intermittent duplicate-element failures under unaligned checkpoints). That is 
distinct from this data-loss bug, I'll create another Jira so the two are not 
conflated.

> SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling failed in 
> test_cron_adaptive_scheduler tests
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38614
>                 URL: https://issues.apache.org/jira/browse/FLINK-38614
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 2.2.0
>            Reporter: Ruan Hang
>            Priority: Major
>              Labels: pull-request-available
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=70660&view=logs&j=8fd9202e-fd17-5b26-353c-ac1ff76c8f28&t=b9e77e1f-4958-59ff-be3f-1de9c3306a13



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to