Hi all,

I'd like to pick up FLINK-39639, a silent data loss bug where a split
assigned to a source operator that has already finished reading is accepted
but never processed. A second user independently hit this from a BigQuery
source in batch.

Root cause: SourceOperator.handleAddSplitsEvent has no guard for the
finished state. When the coordinator assigns a split after the operator has
reached END_OF_INPUT (DATA_FINISHED), the split is handed to the reader via
addSplits but emitNext no longer polls it, so it is never read. The
coordinator receives no error and treats the assignment as successful, so
in batch execution the records are silently dropped.

Proposed fix: Reject the assignment in DATA_FINISHED by failing the task in
handleAddSplitsEvent. This does not lose data: the split is recorded in the
tracker before the event is sent (SourceCoordinatorContext.assignSplits),
so on recovery SourceCoordinator.subtaskReset returns it to the
SplitEnumerator (subtaskReset -> getAndRemoveUncheckpointedAssignment ->
addSplitsBack) for reassignment. I have implemented this on current master
and validated it with new unit and end-to-end tests.

One open point: the guard changes one existing test,
testSourceCheckpointLastUnaligned (FLINK-18906), which drives a source to
END_OF_INPUT with no splits and asserts a later split's records are
dropped. Since a real reader only reaches END_OF_INPUT after
no-more-splits, that drop looks like the same bug rather than intended
behavior. I left the same analysis on the JIRA.

Could a committer please review the approach, and if it looks reasonable,
assign the ticket to me so I can open a PR?

Thanks,
Spoorthi Basu

Reply via email to