Hi all, I'd like to pick up FLINK-39639, a silent data loss bug where a split assigned to a source operator that has already finished reading is accepted but never processed. A second user independently hit this from a BigQuery source in batch.
Root cause: SourceOperator.handleAddSplitsEvent has no guard for the finished state. When the coordinator assigns a split after the operator has reached END_OF_INPUT (DATA_FINISHED), the split is handed to the reader via addSplits but emitNext no longer polls it, so it is never read. The coordinator receives no error and treats the assignment as successful, so in batch execution the records are silently dropped. Proposed fix: Reject the assignment in DATA_FINISHED by failing the task in handleAddSplitsEvent. This does not lose data: the split is recorded in the tracker before the event is sent (SourceCoordinatorContext.assignSplits), so on recovery SourceCoordinator.subtaskReset returns it to the SplitEnumerator (subtaskReset -> getAndRemoveUncheckpointedAssignment -> addSplitsBack) for reassignment. I have implemented this on current master and validated it with new unit and end-to-end tests. One open point: the guard changes one existing test, testSourceCheckpointLastUnaligned (FLINK-18906), which drives a source to END_OF_INPUT with no splits and asserts a later split's records are dropped. Since a real reader only reaches END_OF_INPUT after no-more-splits, that drop looks like the same bug rather than intended behavior. I left the same analysis on the JIRA. Could a committer please review the approach, and if it looks reasonable, assign the ticket to me so I can open a PR? Thanks, Spoorthi Basu
