[
https://issues.apache.org/jira/browse/FLINK-35924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiangjie Qin closed FLINK-35924.
--------------------------------
Resolution: Implemented
Merged to master: a15bf58da5442deeb07ac2a1795a961a0ec75561
> Improve the SourceReaderBase to support the RecordsWithSplitIds share
> internal buffer from SplitReader.
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35924
> URL: https://issues.apache.org/jira/browse/FLINK-35924
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Common
> Affects Versions: 1.20.0
> Reporter: Jiangjie Qin
> Assignee: Jiangjie Qin
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Recently, we saw corrupted {{RecordsWithSplitIds}} in one of our Iceberg
> source implementation. The problem was caused by following sequence:
> # The {{SourceReaderBase}} does not have anything in the element queue. And
> the current fetch has been drained.
> # the main thread invokes {{{}SourceReaderBase.pollNext(){}}}, sees nothing
> to consume, and goes into the {{SourceReaderBase.finishedOrAvailableLater()}}
> call.
> # A {{SplitFetcher}} thread just finished consuming from a split and put the
> last batch of records into the element queue. The SplitFetcher becomes Idle
> after that, i.e. no assigned splits and no pending tasks.
> #
> The main thread invokes
> {{SplitFetcherManager.maybeShutdownFinishedFetchers()}} to shutdown idle
> fetchers. So the {{SplitFetcher}} in (3) is shutdown. As a result the
> {{SplitReader}} is also closed.
> # The main thread then sees a non-empty element queue, with the last batch
> put by the {{SplitFetcher}} which has just shutdown.
> # The main thread invokes {{SourceReaderBase.pollNext()}} again to process
> the last batch, but receives an exception, because the records contained in
> the batch has been released as a part of the {{SplitReader}} closure in (4)
> The problem here is that the {{SourceReaderBase}} implementation assumes that
> once a batch of records (i.e. {{{}RecordsWithSplitIds{}}}) is generated,
> these records are available even after the {{SplitReader}} which generated
> them is closed.
> This assumption forces some of the connector implementations to copy the
> records fetched from the {{{}SplitReader{}}}, and hence introduces additional
> overhead.
> This patch aims to improve the {{SourceReaderBase}} to ensure that a
> {{SplitReader}} will not be closed until all the records it have emitted have
> been processed.
> There is also a somewhat orthogonal problem that the {{SplitFetchers}} are
> being closed too aggressively. We've seen that in most cases, a dedicated
> SplitFetcher is created to handle a split and closed right away after the
> current split is finished but before the next split arrives. I'll create a
> separate patch to introduce a short period of timeout before we define a
> {{SplitFetcher}} thread as idle.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)