[
https://issues.apache.org/jira/browse/FLINK-39639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083632#comment-18083632
]
Calico Shop commented on FLINK-39639:
-------------------------------------
Just want to share that we ran into this as well which caused silent row loss
from a BigQuery source in a Flink batch application. Connector-side fix +
detailed walkthrough here:
https://github.com/GoogleCloudDataproc/flink-bigquery-connector/pull/305.
> Split assignment accepted after source finished reading leads to silent data
> loss
> ---------------------------------------------------------------------------------
>
> Key: FLINK-39639
> URL: https://issues.apache.org/jira/browse/FLINK-39639
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 2.0.1
> Reporter: Piotr Nowojski
> Priority: Major
>
> h2. Summary
> When an {{OperatorCoordinator}} sends a split assignment
> ({{{}AddSplitEvent{}}}) to a {{SourceOperator}} that has concurrently
> finished reading (reached {{{}END_OF_INPUT{}}}), the event is silently
> accepted and the splits are added to the {{SourceReader}} — but they will
> never be processed. The coordinator receives no error signal, so it believes
> the assignment succeeded. This is silent
> data loss.
> h2. Root Cause
> There is a window between the source operator finishing and the mailbox being
> quiesced where coordinator events are still accepted and processed:
> {{SourceReader.pollNext()}} returns {{END_OF_INPUT}}
> {{StreamTask.processInput()}} calls {{endData(DRAIN)}} →
> {{operatorChain.finishOperators()}} → {{SourceOperator.finish()}}
> The mailbox loop suspends, but the mailbox remains in {{OPEN}} state
> A coordinator event (split assignment) arrives via RPC and is queued into the
> mailbox via {{StreamTask.dispatchOperatorEvent()}}
> {{afterInvoke()}} runs a second mailbox loop ({{{}StreamTask.java{}}} line
> 1074) that processes the queued event
> {{SourceOperator.handleOperatorEvent()}} calls {{sourceReader.addSplits()}}
> with no guard on the operator's {{operatingMode}} — the split is accepted
> even though the operator is in
> {{DATA_FINISHED}} mode
> The only protection against late events is the {{RejectedExecutionException}}
> catch in {{{}StreamTask.dispatchOperatorEvent(){}}}, but that only fires
> after {{mailboxProcessor.prepareClose()}}
> (line 1085), which happens after the second mailbox loop has already drained.
> h2. Impact
> In batch (bounded) execution, if a {{SplitEnumerator}} assigns splits
> concurrently with a reader reaching end-of-input, the assigned splits are
> lost without any error. The enumerator believes
> the assignment succeeded. This can occur when:
> - Multiple subtasks finish at different times and the enumerator
> redistributes work
> - The enumerator performs lazy/deferred split discovery and assigns splits
> just as the reader finishes its current work
> h2. Reproducer
> A unit test using {{StreamTaskMailboxTestHarness}} demonstrates the issue
> without any timing dependencies:
> {code:java}
> /**
> * Verifies that a coordinator event (split assignment) dispatched after the
> source has finished
> * reading (END_OF_INPUT) is silently accepted and the splits are added to
> the reader, even
> * though they will never be processed — demonstrating a race condition
> between coordinator event
> * delivery and operator lifecycle.
> */
> @Test
> void testSplitAssignmentAfterSourceFinished() throws Exception {
> SplitAssignmentTrackingSource testSource =
> new SplitAssignmentTrackingSource(Boundedness.BOUNDED);
> SourceOperatorFactory<Integer> sourceOperatorFactory =
> new SourceOperatorFactory<>(testSource,
> WatermarkStrategy.noWatermarks());
> try (StreamTaskMailboxTestHarness<Integer> testHarness =
> new StreamTaskMailboxTestHarnessBuilder<>(
> SourceOperatorStreamTask::new,
> BasicTypeInfo.INT_TYPE_INFO)
> .setCollectNetworkEvents()
>
> .setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID)
> .build()) {
> SplitAssignmentTrackingReader reader =
> (SplitAssignmentTrackingReader)
> ((SourceOperator)
> testHarness.getStreamTask().mainOperator)
> .getSourceReader();
> // Assign an initial split with some records so the source has work
> to do.
> MockSourceSplit initialSplit = new MockSourceSplit(0, 0, 2);
> initialSplit.addRecord(100);
> initialSplit.addRecord(200);
> AddSplitEvent<MockSourceSplit> initialAssignment =
> new AddSplitEvent<>(
> Collections.singletonList(initialSplit),
> new MockSourceSplitSerializer());
> testHarness
> .getStreamTask()
> .dispatchOperatorEvent(OPERATOR_ID, new
> SerializedValue<>(initialAssignment));
> // Process all records. The source will reach END_OF_INPUT,
> triggering endData() which
> // calls finish() on the operator. The mailbox loop then suspends.
> reader.markAvailable();
> testHarness.processAll();
> // At this point:
> // - The source reader has returned END_OF_INPUT
> // - SourceOperator.finish() has been called
> // - The mailbox is suspended but still OPEN (not quiesced)
> assertThat(reader.hasFinishedReading).isTrue();
> assertThat(reader.splitsAddedAfterFinish).isEmpty();
> // Now simulate a coordinator sending a late split assignment. This
> can happen in
> // practice when the enumerator assigns splits concurrently with the
> reader finishing.
> MockSourceSplit lateSplit = new MockSourceSplit(1, 0);
> lateSplit.addRecord(999);
> AddSplitEvent<MockSourceSplit> lateAssignment =
> new AddSplitEvent<>(
> Collections.singletonList(lateSplit),
> new MockSourceSplitSerializer());
> testHarness
> .getStreamTask()
> .dispatchOperatorEvent(OPERATOR_ID, new
> SerializedValue<>(lateAssignment));
> // Drive the task through afterInvoke(), which runs a second mailbox
> loop that will
> // process the queued split assignment event.
> testHarness.finishProcessing();
> // The late split was accepted by the reader even though the source
> had already
> // finished reading. This split will never be processed — it
> represents lost data.
> assertThat(reader.splitsAddedAfterFinish)
> .as(
> "Split assignment was accepted after the source
> finished reading, "
> + "leading to potential data loss")
> .hasSize(1);
>
> assertThat(reader.splitsAddedAfterFinish.get(0).splitId()).isEqualTo("1");
> }
> }
> private static class SplitAssignmentTrackingSource extends MockSource {
> private static final long serialVersionUID = 1L;
> SplitAssignmentTrackingSource(Boundedness boundedness) {
> super(boundedness, 1);
> }
> @Override
> public SourceReader<Integer, MockSourceSplit> createReader(
> SourceReaderContext readerContext) {
> return new SplitAssignmentTrackingReader();
> }
> }
> private static class SplitAssignmentTrackingReader extends MockSourceReader {
> volatile boolean hasFinishedReading = false;
> volatile boolean hasClosed = false;
> final List<MockSourceSplit> splitsAddedAfterFinish = new ArrayList<>();
> final List<MockSourceSplit> splitsAddedAfterClose = new ArrayList<>();
> @Override
> public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws
> Exception {
> InputStatus status = super.pollNext(sourceOutput);
> if (status == InputStatus.END_OF_INPUT) {
> hasFinishedReading = true;
> }
> return status;
> }
> @Override
> public void close() throws Exception {
> hasClosed = true;
> super.close();
> }
> @Override
> public void addSplits(List<MockSourceSplit> splits) {
> if (hasFinishedReading) {
> splitsAddedAfterFinish.addAll(splits);
> }
> if (hasClosed) {
> splitsAddedAfterClose.addAll(splits);
> }
> super.addSplits(splits);
> }
> }{code}
> Full test added in {{{}SourceOperatorStreamTaskTest{}}}.
> h2. Possible Fixes
> - {{SourceOperator.handleOperatorEvent()}} could check {{operatingMode}} and
> reject events when in {{DATA_FINISHED}} (or any post-reading state). The
> coordinator would then receive an error and
> could handle it (e.g., reassign the split to another subtask).
> - {{StreamTask.dispatchOperatorEvent()}} could check whether operators have
> been finished before enqueueing the event.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)