[ 
https://issues.apache.org/jira/browse/FLINK-39639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083632#comment-18083632
 ] 

Calico Shop commented on FLINK-39639:
-------------------------------------

Just want to share that we ran into this as well which caused silent row loss 
from a BigQuery source in a Flink batch application. Connector-side fix + 
detailed walkthrough here: 
https://github.com/GoogleCloudDataproc/flink-bigquery-connector/pull/305.

> Split assignment accepted after source finished reading leads to silent data 
> loss
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-39639
>                 URL: https://issues.apache.org/jira/browse/FLINK-39639
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 2.0.1
>            Reporter: Piotr Nowojski
>            Priority: Major
>
> h2. Summary
> When an {{OperatorCoordinator}} sends a split assignment 
> ({{{}AddSplitEvent{}}}) to a {{SourceOperator}} that has concurrently 
> finished reading (reached {{{}END_OF_INPUT{}}}), the event is silently 
> accepted and the splits are added to the {{SourceReader}} — but they will 
> never be processed. The coordinator receives no error signal, so it believes 
> the assignment succeeded. This is silent
> data loss.
> h2. Root Cause
> There is a window between the source operator finishing and the mailbox being 
> quiesced where coordinator events are still accepted and processed:
> {{SourceReader.pollNext()}} returns {{END_OF_INPUT}}
> {{StreamTask.processInput()}} calls {{endData(DRAIN)}} → 
> {{operatorChain.finishOperators()}} → {{SourceOperator.finish()}}
> The mailbox loop suspends, but the mailbox remains in {{OPEN}} state
> A coordinator event (split assignment) arrives via RPC and is queued into the 
> mailbox via {{StreamTask.dispatchOperatorEvent()}}
> {{afterInvoke()}} runs a second mailbox loop ({{{}StreamTask.java{}}} line 
> 1074) that processes the queued event
> {{SourceOperator.handleOperatorEvent()}} calls {{sourceReader.addSplits()}} 
> with no guard on the operator's {{operatingMode}} — the split is accepted 
> even though the operator is in 
> {{DATA_FINISHED}} mode
> The only protection against late events is the {{RejectedExecutionException}} 
> catch in {{{}StreamTask.dispatchOperatorEvent(){}}}, but that only fires 
> after {{mailboxProcessor.prepareClose()}} 
> (line 1085), which happens after the second mailbox loop has already drained.
> h2. Impact
> In batch (bounded) execution, if a {{SplitEnumerator}} assigns splits 
> concurrently with a reader reaching end-of-input, the assigned splits are 
> lost without any error. The enumerator believes 
> the assignment succeeded. This can occur when:
>  - Multiple subtasks finish at different times and the enumerator 
> redistributes work
>  - The enumerator performs lazy/deferred split discovery and assigns splits 
> just as the reader finishes its current work
> h2. Reproducer
> A unit test using {{StreamTaskMailboxTestHarness}} demonstrates the issue 
> without any timing dependencies:
> {code:java}
> /**
>  * Verifies that a coordinator event (split assignment) dispatched after the 
> source has finished
>  * reading (END_OF_INPUT) is silently accepted and the splits are added to 
> the reader, even
>  * though they will never be processed — demonstrating a race condition 
> between coordinator event
>  * delivery and operator lifecycle.
>  */
> @Test
> void testSplitAssignmentAfterSourceFinished() throws Exception {
>     SplitAssignmentTrackingSource testSource =
>             new SplitAssignmentTrackingSource(Boundedness.BOUNDED);
>     SourceOperatorFactory<Integer> sourceOperatorFactory =
>             new SourceOperatorFactory<>(testSource, 
> WatermarkStrategy.noWatermarks());
>     try (StreamTaskMailboxTestHarness<Integer> testHarness =
>             new StreamTaskMailboxTestHarnessBuilder<>(
>                             SourceOperatorStreamTask::new, 
> BasicTypeInfo.INT_TYPE_INFO)
>                     .setCollectNetworkEvents()
>                     
> .setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID)
>                     .build()) {
>         SplitAssignmentTrackingReader reader =
>                 (SplitAssignmentTrackingReader)
>                         ((SourceOperator) 
> testHarness.getStreamTask().mainOperator)
>                                 .getSourceReader();
>         // Assign an initial split with some records so the source has work 
> to do.
>         MockSourceSplit initialSplit = new MockSourceSplit(0, 0, 2);
>         initialSplit.addRecord(100);
>         initialSplit.addRecord(200);
>         AddSplitEvent<MockSourceSplit> initialAssignment =
>                 new AddSplitEvent<>(
>                         Collections.singletonList(initialSplit),
>                         new MockSourceSplitSerializer());
>         testHarness
>                 .getStreamTask()
>                 .dispatchOperatorEvent(OPERATOR_ID, new 
> SerializedValue<>(initialAssignment));
>         // Process all records. The source will reach END_OF_INPUT, 
> triggering endData() which
>         // calls finish() on the operator. The mailbox loop then suspends.
>         reader.markAvailable();
>         testHarness.processAll();
>         // At this point:
>         // - The source reader has returned END_OF_INPUT
>         // - SourceOperator.finish() has been called
>         // - The mailbox is suspended but still OPEN (not quiesced)
>         assertThat(reader.hasFinishedReading).isTrue();
>         assertThat(reader.splitsAddedAfterFinish).isEmpty();
>         // Now simulate a coordinator sending a late split assignment. This 
> can happen in
>         // practice when the enumerator assigns splits concurrently with the 
> reader finishing.
>         MockSourceSplit lateSplit = new MockSourceSplit(1, 0);
>         lateSplit.addRecord(999);
>         AddSplitEvent<MockSourceSplit> lateAssignment =
>                 new AddSplitEvent<>(
>                         Collections.singletonList(lateSplit),
>                         new MockSourceSplitSerializer());
>         testHarness
>                 .getStreamTask()
>                 .dispatchOperatorEvent(OPERATOR_ID, new 
> SerializedValue<>(lateAssignment));
>         // Drive the task through afterInvoke(), which runs a second mailbox 
> loop that will
>         // process the queued split assignment event.
>         testHarness.finishProcessing();
>         // The late split was accepted by the reader even though the source 
> had already
>         // finished reading. This split will never be processed — it 
> represents lost data.
>         assertThat(reader.splitsAddedAfterFinish)
>                 .as(
>                         "Split assignment was accepted after the source 
> finished reading, "
>                                 + "leading to potential data loss")
>                 .hasSize(1);
>         
> assertThat(reader.splitsAddedAfterFinish.get(0).splitId()).isEqualTo("1");
>     }
> }
> private static class SplitAssignmentTrackingSource extends MockSource {
>     private static final long serialVersionUID = 1L;
>     SplitAssignmentTrackingSource(Boundedness boundedness) {
>         super(boundedness, 1);
>     }
>     @Override
>     public SourceReader<Integer, MockSourceSplit> createReader(
>             SourceReaderContext readerContext) {
>         return new SplitAssignmentTrackingReader();
>     }
> }
> private static class SplitAssignmentTrackingReader extends MockSourceReader {
>     volatile boolean hasFinishedReading = false;
>     volatile boolean hasClosed = false;
>     final List<MockSourceSplit> splitsAddedAfterFinish = new ArrayList<>();
>     final List<MockSourceSplit> splitsAddedAfterClose = new ArrayList<>();
>     @Override
>     public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws 
> Exception {
>         InputStatus status = super.pollNext(sourceOutput);
>         if (status == InputStatus.END_OF_INPUT) {
>             hasFinishedReading = true;
>         }
>         return status;
>     }
>     @Override
>     public void close() throws Exception {
>         hasClosed = true;
>         super.close();
>     }
>     @Override
>     public void addSplits(List<MockSourceSplit> splits) {
>         if (hasFinishedReading) {
>             splitsAddedAfterFinish.addAll(splits);
>         }
>         if (hasClosed) {
>             splitsAddedAfterClose.addAll(splits);
>         }
>         super.addSplits(splits);
>     }
> }{code}
> Full test added in {{{}SourceOperatorStreamTaskTest{}}}.
> h2. Possible Fixes
>  - {{SourceOperator.handleOperatorEvent()}} could check {{operatingMode}} and 
> reject events when in {{DATA_FINISHED}} (or any post-reading state). The 
> coordinator would then receive an error and
> could handle it (e.g., reassign the split to another subtask).
>  - {{StreamTask.dispatchOperatorEvent()}} could check whether operators have 
> been finished before enqueueing the event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to