MartijnVisser opened a new pull request, #28476:
URL: https://github.com/apache/flink/pull/28476

   ## What is the purpose of the change
   
   SinkV2 jobs with a post-commit (global) committer can silently **lose the 
final checkpoint's committables under unaligned checkpoints** — an exactly-once 
data-loss bug. It surfaced as the intermittently failing 
`SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling` 
(FLINK-38614).
   
   Root cause: the `CommitterOperator` forwards the final checkpoint's 
committables to the downstream global committer only on 
`notifyCheckpointComplete`, which (per FLIP-147) runs *after* `endInput`. With 
unaligned checkpoints those committables can belong to the post-end-of-input 
final checkpoint. The task broadcasts `EndOfData` right after `endInput` 
(before that final `notifyCheckpointComplete`), so the downstream global 
committer finishes consuming its input before the committables arrive and 
silently drops them.
   
   This change defers the downstream `EndOfData` broadcast — for tasks whose 
operator chain emits records on the final checkpoint — until the final 
checkpoint completes, so those records are delivered before the downstream 
finishes.
   
   ## Brief change log
   
   - New `@Internal` marker interface `EmitsRecordsOnFinalCheckpoint` in 
`org.apache.flink.streaming.api.operators` (beside `BoundedOneInput`).
   - `CommitterOperator` implements it; it returns `true` only when it forwards 
downstream and a final checkpoint will be taken (streaming + checkpointing). 
The condition is centralized in `hasFinalCheckpoint()`, shared with `endInput`.
   - `StreamTask.endData` defers the downstream `EndOfData` broadcast when 
checkpoints-after-tasks-finished is enabled and the chain contains such an 
operator; `StreamTask.notifyCheckpointComplete` flushes the deferred broadcast 
once the final checkpoint completes.
   - Added `StreamTaskDeferredEndOfDataTest`.
   
   Notes for reviewers:
   - **Why a marker interface** rather than `instanceof CommitterOperator`: the 
deferral is a generic task-lifecycle concern, and `StreamTask` should not 
depend on the sink package. `CommitterOperator` is the only in-tree implementor 
today.
   - **Non-sink tasks are unaffected**: `shouldDeferEndOfData()` short-circuits 
unless checkpoints-after-finished-tasks is enabled and an operator opts in. 
`endData` runs once per task, not a per-record path.
   - **No new hang**: `afterInvoke` already gates termination on 
`finalCheckpointCompleted` under the identical condition, so the deferral adds 
no new termination dependency. The flush is gated on the final checkpoint 
completing; an uncompleted/aborted checkpoint does not flush, and an early 
flush is deliberately avoided because it would re-introduce the loss.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - `StreamTaskDeferredEndOfDataTest` (new) deterministically verifies (a) the 
final-checkpoint record is emitted *before* `EndOfData` downstream, and (b) the 
deferred `EndOfData` is not flushed by an uncompleted/aborted final checkpoint 
but is flushed (after the record) once a later one completes. Both assertions 
fail without the fix.
   - Existing `StreamTaskFinalCheckpointsTest`, `SinkV2CommitterOperatorTest`, 
`SinkV2SinkWriterOperatorTest`, `CheckpointAfterAllTasksFinishedITCase`, and 
`SinkV2ITCase` remain green.
   - The end-to-end loss is inherently a cross-task scheduling race (committer 
task vs global-committer task) and is not deterministically reproducible as a 
plain ITCase; it was reproduced under controlled scheduling and confirmed 
fixed. The new unit test guards the single-task invariant the fix establishes 
(records emitted before `EndOfData`).
   
   This change affects checkpoint/end-of-input coordination in `StreamTask` 
(FLIP-147), so specialist review from a checkpointing/runtime maintainer is 
requested.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (end-of-input / 
final-checkpoint coordination)
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes (Claude Opus 4.8)
   
   Generated-by: Claude Opus 4.8
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to