[
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-33360:
-----------------------------------
Labels: pull-request-available (was: )
> HybridSource fails to clear the previous round's state when switching
> sources, leading to data loss
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-33360
> URL: https://issues.apache.org/jira/browse/FLINK-33360
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Affects Versions: 1.16.2, 1.17.1
> Reporter: Feng Jiajie
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.3
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
> // track readers that have finished processing for current
> enumerator
> finishedReaders.add(subtaskId);
> if (finishedReaders.size() == context.currentParallelism()) {
> LOG.debug("All readers finished, ready to switch
> enumerator!");
> if (currentSourceIndex + 1 < sources.size()) {
> switchEnumerator();
> // switch all readers prior to sending split assignments
> for (int i = 0; i < context.currentParallelism(); i++) {
> sendSwitchSourceEvent(i, currentSourceIndex);
> }
> }
> } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds
> that have finished reading the current round of the source. Therefore, in the
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders
> may not have finished processing in parallel), the condition
> *finishedReaders.size() == context.currentParallelism()* will be satisfied
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex),
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the
> previous source, it will execute {*}currentReader.close(){*}, and some data
> may not be fully read, resulting in a partial data loss in the source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)