[
https://issues.apache.org/jira/browse/FLINK-38614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18089771#comment-18089771
]
Martijn Visser commented on FLINK-38614:
----------------------------------------
I've done a lot of (AI) investigation with this one, and I think this is
actually an exactly-once data-loss bug, not a flaky test.
Root cause:
SinkV2 jobs with a post-commit (global) committer can lose the final
checkpoint's committables under unaligned checkpoints:
* The {{CommitterOperator}} forwards the final checkpoint's committables to the
downstream global committer only on {{notifyCheckpointComplete}}, which (per
FLIP-147) runs after {{endInput}}.
* With unaligned checkpoints those committables can belong to the
post-end-of-input final checkpoint.
* The task broadcasts {{EndOfData}} downstream right after {{endInput}} (before
that final {{notifyCheckpointComplete}}), so the global committer finishes
consuming its input before the committables arrive and silently drops them.
It is parallelism-agnostic (reproduced at 1->1, not only on rescale); unaligned
checkpoints on the pipeline upstream of the sink are the trigger. With aligned
checkpoints the last committables fall into a pre-end-of-input checkpoint and
are delivered normally, which is why it only surfaces intermittently under the
CI checkpoint randomization.
Proposed fix (I'll open a draft PR):
Defer the downstream {{EndOfData}} broadcast -- for tasks whose operator chain
emits records on the final checkpoint -- until the final checkpoint completes,
so the records are delivered before the downstream finishes.
Because this touches the {{StreamTask}} end-of-input / final-checkpoint
coordination (FLIP-147), this needs a review from someone with the specific
expertise in this area.
One other remark: there is a separate test-correctness issue with the same test
(it asserts an exact commit multiplicity with a non-idempotent test committer,
which over-specifies the {{Committer}} idempotency contract and produces
intermittent duplicate-element failures under unaligned checkpoints). That is
distinct from this data-loss bug, I'll create another Jira so the two are not
conflated.
> SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling failed in
> test_cron_adaptive_scheduler tests
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38614
> URL: https://issues.apache.org/jira/browse/FLINK-38614
> Project: Flink
> Issue Type: Bug
> Components: Tests
> Affects Versions: 2.2.0
> Reporter: Ruan Hang
> Priority: Major
> Labels: pull-request-available
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=70660&view=logs&j=8fd9202e-fd17-5b26-353c-ac1ff76c8f28&t=b9e77e1f-4958-59ff-be3f-1de9c3306a13
--
This message was sent by Atlassian Jira
(v8.20.10#820010)