MartijnVisser opened a new pull request, #28476:
URL: https://github.com/apache/flink/pull/28476
## What is the purpose of the change
SinkV2 jobs with a post-commit (global) committer can silently **lose the
final checkpoint's committables under unaligned checkpoints** — an exactly-once
data-loss bug. It surfaced as the intermittently failing
`SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling`
(FLINK-38614).
Root cause: the `CommitterOperator` forwards the final checkpoint's
committables to the downstream global committer only on
`notifyCheckpointComplete`, which (per FLIP-147) runs *after* `endInput`. With
unaligned checkpoints those committables can belong to the post-end-of-input
final checkpoint. The task broadcasts `EndOfData` right after `endInput`
(before that final `notifyCheckpointComplete`), so the downstream global
committer finishes consuming its input before the committables arrive and
silently drops them.
This change defers the downstream `EndOfData` broadcast — for tasks whose
operator chain emits records on the final checkpoint — until the final
checkpoint completes, so those records are delivered before the downstream
finishes.
## Brief change log
- New `@Internal` marker interface `EmitsRecordsOnFinalCheckpoint` in
`org.apache.flink.streaming.api.operators` (beside `BoundedOneInput`).
- `CommitterOperator` implements it; it returns `true` only when it forwards
downstream and a final checkpoint will be taken (streaming + checkpointing).
The condition is centralized in `hasFinalCheckpoint()`, shared with `endInput`.
- `StreamTask.endData` defers the downstream `EndOfData` broadcast when
checkpoints-after-tasks-finished is enabled and the chain contains such an
operator; `StreamTask.notifyCheckpointComplete` flushes the deferred broadcast
once the final checkpoint completes.
- Added `StreamTaskDeferredEndOfDataTest`.
Notes for reviewers:
- **Why a marker interface** rather than `instanceof CommitterOperator`: the
deferral is a generic task-lifecycle concern, and `StreamTask` should not
depend on the sink package. `CommitterOperator` is the only in-tree implementor
today.
- **Non-sink tasks are unaffected**: `shouldDeferEndOfData()` short-circuits
unless checkpoints-after-finished-tasks is enabled and an operator opts in.
`endData` runs once per task, not a per-record path.
- **No new hang**: `afterInvoke` already gates termination on
`finalCheckpointCompleted` under the identical condition, so the deferral adds
no new termination dependency. The flush is gated on the final checkpoint
completing; an uncompleted/aborted checkpoint does not flush, and an early
flush is deliberately avoided because it would re-introduce the loss.
## Verifying this change
This change added tests and can be verified as follows:
- `StreamTaskDeferredEndOfDataTest` (new) deterministically verifies (a) the
final-checkpoint record is emitted *before* `EndOfData` downstream, and (b) the
deferred `EndOfData` is not flushed by an uncompleted/aborted final checkpoint
but is flushed (after the record) once a later one completes. Both assertions
fail without the fix.
- Existing `StreamTaskFinalCheckpointsTest`, `SinkV2CommitterOperatorTest`,
`SinkV2SinkWriterOperatorTest`, `CheckpointAfterAllTasksFinishedITCase`, and
`SinkV2ITCase` remain green.
- The end-to-end loss is inherently a cross-task scheduling race (committer
task vs global-committer task) and is not deterministically reproducible as a
plain ITCase; it was reproduced under controlled scheduling and confirmed
fixed. The new unit test guards the single-task invariant the fix establishes
(records emitted before `EndOfData`).
This change affects checkpoint/end-of-input coordination in `StreamTask`
(FLIP-147), so specialist review from a checkpointing/runtime maintainer is
requested.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (end-of-input /
final-checkpoint coordination)
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes (Claude Opus 4.8)
Generated-by: Claude Opus 4.8
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]