[
https://issues.apache.org/jira/browse/FLINK-39639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088105#comment-18088105
]
Spoorthi Basu commented on FLINK-39639:
---------------------------------------
Hi [~pnowojski] , I reproduced this on current master with the test in the
description. The root cause is that {{handleAddSplitsEvent}} has no
{{DATA_FINISHED}} guard: a late {{AddSplitEvent}} reaches
{{sourceReader.addSplits()}} but is never polled, since {{emitNext}} no longer
drives the reader. The coordinator gets no signal and records the assignment as
successful, which is the silent loss.
My fix rejects the assignment in {{DATA_FINISHED}} by failing the task. This
doesn't lose data: the split is recorded in the assignment tracker before the
event is sent ({{{}SourceCoordinatorContext.assignSplits{}}}), so failover
returns it via {{subtaskReset}} -> {{getAndRemoveUncheckpointedAssignment}} ->
{{addSplitsBack}} and it's reassigned.
It does change one existing test, {{testSourceCheckpointLastUnaligned}} from
FLINK-18906: it drives the chained {{MockSource}} to {{END_OF_INPUT}} (no
splits, so the reader finishes immediately) and asserts that a later split's
records are dropped. A real reader only reaches {{END_OF_INPUT}} after
no-more-splits, so that drop looks like the mock surfacing this same bug rather
than intended behavior, in which case the test should expect the rejection.
Before I change it: is the post-finish drop ever intentional, or is rejecting
at {{DATA_FINISHED}} the right call across the board?
Happy to open a PR once that's settled.
> Split assignment accepted after source finished reading leads to silent data
> loss
> ---------------------------------------------------------------------------------
>
> Key: FLINK-39639
> URL: https://issues.apache.org/jira/browse/FLINK-39639
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 2.0.1
> Reporter: Piotr Nowojski
> Priority: Major
>
> h2. Summary
> When an {{OperatorCoordinator}} sends a split assignment
> ({{{}AddSplitEvent{}}}) to a {{SourceOperator}} that has concurrently
> finished reading (reached {{{}END_OF_INPUT{}}}), the event is silently
> accepted and the splits are added to the {{SourceReader}} — but they will
> never be processed. The coordinator receives no error signal, so it believes
> the assignment succeeded. This is silent
> data loss.
> h2. Root Cause
> There is a window between the source operator finishing and the mailbox being
> quiesced where coordinator events are still accepted and processed:
> {{SourceReader.pollNext()}} returns {{END_OF_INPUT}}
> {{StreamTask.processInput()}} calls {{endData(DRAIN)}} →
> {{operatorChain.finishOperators()}} → {{SourceOperator.finish()}}
> The mailbox loop suspends, but the mailbox remains in {{OPEN}} state
> A coordinator event (split assignment) arrives via RPC and is queued into the
> mailbox via {{StreamTask.dispatchOperatorEvent()}}
> {{afterInvoke()}} runs a second mailbox loop ({{{}StreamTask.java{}}} line
> 1074) that processes the queued event
> {{SourceOperator.handleOperatorEvent()}} calls {{sourceReader.addSplits()}}
> with no guard on the operator's {{operatingMode}} — the split is accepted
> even though the operator is in
> {{DATA_FINISHED}} mode
> The only protection against late events is the {{RejectedExecutionException}}
> catch in {{{}StreamTask.dispatchOperatorEvent(){}}}, but that only fires
> after {{mailboxProcessor.prepareClose()}}
> (line 1085), which happens after the second mailbox loop has already drained.
> h2. Impact
> In batch (bounded) execution, if a {{SplitEnumerator}} assigns splits
> concurrently with a reader reaching end-of-input, the assigned splits are
> lost without any error. The enumerator believes
> the assignment succeeded. This can occur when:
> - Multiple subtasks finish at different times and the enumerator
> redistributes work
> - The enumerator performs lazy/deferred split discovery and assigns splits
> just as the reader finishes its current work
> h2. Reproducer
> A unit test using {{StreamTaskMailboxTestHarness}} demonstrates the issue
> without any timing dependencies:
> {code:java}
> /**
> * Verifies that a coordinator event (split assignment) dispatched after the
> source has finished
> * reading (END_OF_INPUT) is silently accepted and the splits are added to
> the reader, even
> * though they will never be processed — demonstrating a race condition
> between coordinator event
> * delivery and operator lifecycle.
> */
> @Test
> void testSplitAssignmentAfterSourceFinished() throws Exception {
> SplitAssignmentTrackingSource testSource =
> new SplitAssignmentTrackingSource(Boundedness.BOUNDED);
> SourceOperatorFactory<Integer> sourceOperatorFactory =
> new SourceOperatorFactory<>(testSource,
> WatermarkStrategy.noWatermarks());
> try (StreamTaskMailboxTestHarness<Integer> testHarness =
> new StreamTaskMailboxTestHarnessBuilder<>(
> SourceOperatorStreamTask::new,
> BasicTypeInfo.INT_TYPE_INFO)
> .setCollectNetworkEvents()
>
> .setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID)
> .build()) {
> SplitAssignmentTrackingReader reader =
> (SplitAssignmentTrackingReader)
> ((SourceOperator)
> testHarness.getStreamTask().mainOperator)
> .getSourceReader();
> // Assign an initial split with some records so the source has work
> to do.
> MockSourceSplit initialSplit = new MockSourceSplit(0, 0, 2);
> initialSplit.addRecord(100);
> initialSplit.addRecord(200);
> AddSplitEvent<MockSourceSplit> initialAssignment =
> new AddSplitEvent<>(
> Collections.singletonList(initialSplit),
> new MockSourceSplitSerializer());
> testHarness
> .getStreamTask()
> .dispatchOperatorEvent(OPERATOR_ID, new
> SerializedValue<>(initialAssignment));
> // Process all records. The source will reach END_OF_INPUT,
> triggering endData() which
> // calls finish() on the operator. The mailbox loop then suspends.
> reader.markAvailable();
> testHarness.processAll();
> // At this point:
> // - The source reader has returned END_OF_INPUT
> // - SourceOperator.finish() has been called
> // - The mailbox is suspended but still OPEN (not quiesced)
> assertThat(reader.hasFinishedReading).isTrue();
> assertThat(reader.splitsAddedAfterFinish).isEmpty();
> // Now simulate a coordinator sending a late split assignment. This
> can happen in
> // practice when the enumerator assigns splits concurrently with the
> reader finishing.
> MockSourceSplit lateSplit = new MockSourceSplit(1, 0);
> lateSplit.addRecord(999);
> AddSplitEvent<MockSourceSplit> lateAssignment =
> new AddSplitEvent<>(
> Collections.singletonList(lateSplit),
> new MockSourceSplitSerializer());
> testHarness
> .getStreamTask()
> .dispatchOperatorEvent(OPERATOR_ID, new
> SerializedValue<>(lateAssignment));
> // Drive the task through afterInvoke(), which runs a second mailbox
> loop that will
> // process the queued split assignment event.
> testHarness.finishProcessing();
> // The late split was accepted by the reader even though the source
> had already
> // finished reading. This split will never be processed — it
> represents lost data.
> assertThat(reader.splitsAddedAfterFinish)
> .as(
> "Split assignment was accepted after the source
> finished reading, "
> + "leading to potential data loss")
> .hasSize(1);
>
> assertThat(reader.splitsAddedAfterFinish.get(0).splitId()).isEqualTo("1");
> }
> }
> private static class SplitAssignmentTrackingSource extends MockSource {
> private static final long serialVersionUID = 1L;
> SplitAssignmentTrackingSource(Boundedness boundedness) {
> super(boundedness, 1);
> }
> @Override
> public SourceReader<Integer, MockSourceSplit> createReader(
> SourceReaderContext readerContext) {
> return new SplitAssignmentTrackingReader();
> }
> }
> private static class SplitAssignmentTrackingReader extends MockSourceReader {
> volatile boolean hasFinishedReading = false;
> volatile boolean hasClosed = false;
> final List<MockSourceSplit> splitsAddedAfterFinish = new ArrayList<>();
> final List<MockSourceSplit> splitsAddedAfterClose = new ArrayList<>();
> @Override
> public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws
> Exception {
> InputStatus status = super.pollNext(sourceOutput);
> if (status == InputStatus.END_OF_INPUT) {
> hasFinishedReading = true;
> }
> return status;
> }
> @Override
> public void close() throws Exception {
> hasClosed = true;
> super.close();
> }
> @Override
> public void addSplits(List<MockSourceSplit> splits) {
> if (hasFinishedReading) {
> splitsAddedAfterFinish.addAll(splits);
> }
> if (hasClosed) {
> splitsAddedAfterClose.addAll(splits);
> }
> super.addSplits(splits);
> }
> }{code}
> Full test added in {{{}SourceOperatorStreamTaskTest{}}}.
> h2. Possible Fixes
> - {{SourceOperator.handleOperatorEvent()}} could check {{operatingMode}} and
> reject events when in {{DATA_FINISHED}} (or any post-reading state). The
> coordinator would then receive an error and
> could handle it (e.g., reassign the split to another subtask).
> - {{StreamTask.dispatchOperatorEvent()}} could check whether operators have
> been finished before enqueueing the event.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)