[
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218676#comment-17218676
]
Kezhu Wang commented on FLINK-19717:
------------------------------------
[~rmetzger] Thanks for response. Let's wait their decision.
[~sewen] [~becket_qin] I think we can solve this by moving {{exceptionHandler}}
from {{ThrowableCatchingRunnable}} to {{SplitFetcher}}, this way we can
rearrange 'happen before' relationship between error-setting and
fetcher-removing inside {{SplitFetcher}} with no interference from extra
structure. I have created [a branch in my
clone|https://github.com/kezhuw/flink/tree/test-case-source-reader-end-of-input-caused-by-split-reader-exception]
to demonstrate test case and possible fix. Commit
[3c65b2d|https://github.com/kezhuw/flink/commit/3c65b2d8dddd3c18c787fd590ece17304110e3fc]
modifies and fails {{SourceReaderBaseTest.testExceptionInSplitReader}},
[e2e383|https://github.com/kezhuw/flink/commit/e2e3832c7a4c726405111fb9198e6737417877d5]
fixes it. Does this and above analysis make sense to you ? If it is, could you
mind assign this issue to me ?
> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -----------------------------------------------------------------------------
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.12.0
> Reporter: Kezhu Wang
> Priority: Major
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After
> executes {{splitFetcherManager.checkErrors()}} but before
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}.
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from
> {{SplitFetcherManager}}.
> 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional
> fetcher is last alive fetcher, then
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which
> will fails in rate about 1/2.
> {code:java}
> @Test
> public void testExceptionInSplitReader() throws Exception {
> expectedException.expect(RuntimeException.class);
> expectedException.expectMessage("One or more fetchers have
> encountered exception");
> final String errMsg = "Testing Exception";
> FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
> elementsQueue =
> new FutureCompletingBlockingQueue<>();
> // We have to handle split changes first, otherwise fetch will
> not be called.
> try (MockSourceReader reader = new MockSourceReader(
> elementsQueue,
> () -> new SplitReader<int[], MockSourceSplit>() {
> @Override
> public RecordsWithSplitIds<int[]> fetch() {
> throw new RuntimeException(errMsg);
> }
> @Override
> public void
> handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
> @Override
> public void wakeUp() {
> }
> },
> getConfig(),
> null)) {
> ValidatingSourceOutput output = new
> ValidatingSourceOutput();
> reader.addSplits(Collections.singletonList(getSplit(0,
> NUM_RECORDS_PER_SPLIT,
> Boundedness.CONTINUOUS_UNBOUNDED)));
> reader.handleSourceEvents(new NoMoreSplitsEvent());
> // This is not a real infinite loop, it is supposed to
> throw exception after some polls.
> while (true) {
> InputStatus inputStatus =
> reader.pollNext(output);
> assertNotEquals(InputStatus.END_OF_INPUT,
> inputStatus);
> // Add a sleep to avoid tight loop.
> Thread.sleep(0);
> }
> }
> }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from
> existing one in three places:
> 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
> 2. Add assertion to assert that {{reader.pollNext}} will not return
> {{InputStatus.END_OF_INPUT}}.
> 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for
> initial discussion.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)