[
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17770650#comment-17770650
]
Robert Hoyt commented on FLINK-33170:
-------------------------------------
If someone can point me to how to Flink's policies on submitting a PR I can
take a shot at fixing this. Otherwise if someone else wants to tackle it, iirc
the fix was to clear the set [near the
end|https://github.com/apache/flink/blob/b25b57c55d903e4fdd2b666de49c90bfbad8fa99/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L305]
of the {{switchEnumerator}} method.
> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+
> sources
> ------------------------------------------------------------------------------------
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
> Reporter: Robert Hoyt
> Priority: Critical
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
>
> In all versions since 1.15.x there's a subtle bug in
> {{HybridSourceSplitEnumerator}} when determining if it's time to move on to
> the next source:
> {code:java}
> finishedReaders.add(subtaskId);
> if (finishedReaders.size() == context.currentParallelism()) {
> // move on to the next source if it exists
> {code}
> This snippet is correct, but when changing to the next source,
> {{finishedReaders}} is never cleared. So when processing the second source,
> the {{finishedReaders.size()}} check will return true when the _first_
> subtask finishes.** The hybrid source moves on to the next source if one
> exists, so any unsent records in other subtasks will get dropped.
>
> ** if each of the sources in the hybrid source has the same parallelism. If
> any source except the last has lower parallelism then I suspect that the
> source will never move on: it's impossible for `finishedReaders.size()` to
> shrink.
>
> Concrete example with three sources, two subtasks each:
> # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
> # subtask 1 finishes with the first source. {{finishedReaders}} has size 2
> now, and moves on to the second source
> # subtask 1 finishes with the first source. {{finishedReaders.add(1)}}
> doesn't change the set; {{finishedReaders}} still has size 2. So the hybrid
> source moves on to the third source.
> # subtask 0 wasn't finished with the second source, but receives the
> notification to move on. Any unsent records are lost. *Data loss!*
> # this continues to the last source. The source doesn't change over if at
> the last source so the race condition in step 3 never happens
>
> So step 3 results in the race condition that will drop records
> indeterminately for all but the first source and last source.
> In production this issue caused the loss of GBs to TBs of data depending on
> the sources. We fixed it in a private fork by clearing the
> {{finishedReaders}} set when changing to the next source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)