[
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771610#comment-17771610
]
Martijn Visser commented on FLINK-33170:
----------------------------------------
[~robertjira] Have a look at
https://flink.apache.org/how-to-contribute/overview/ - If you run into issues,
let me know
> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+
> sources
> ------------------------------------------------------------------------------------
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
> Reporter: Robert Hoyt
> Priority: Critical
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
> Priority labeled critical because this issue can cause major data loss, in
> our experience order GBs to TBs.
>
> In all versions since 1.15.x there's a subtle bug in
> {{HybridSourceSplitEnumerator}} when determining if it's time to move on to
> the next source:
> {code:java}
> finishedReaders.add(subtaskId);
> if (finishedReaders.size() == context.currentParallelism()) {
> // move on to the next source if it exists
> {code}
> This snippet is correct, but when changing to the next source,
> {{finishedReaders}} is never cleared. So when processing the second source,
> the {{finishedReaders.size()}} check will return true when the _first_
> subtask finishes.** The hybrid source moves on to the next source if one
> exists, so any records remaining to be read and sent in the other
> {{numSubtasks - 1}} subtasks will get dropped.
>
> {{**}} if each of the sources in the hybrid source has the same parallelism.
> If any source except the last has lower parallelism then I suspect that the
> source will never move on: it's impossible for {{finishedReaders.size()}} to
> shrink.
>
> Concrete example with three sources, two subtasks each:
> # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
> # subtask 1 finishes with the first source. {{finishedReaders}} has size 2
> now, and moves on to the second source
> # subtask 1 finishes with the first source. {{finishedReaders.add(1)}}
> doesn't change the set; {{finishedReaders}} still has size 2. So the hybrid
> source moves on to the third source.
> # subtask 0 wasn't finished with the second source, but receives the
> notification to move on. Any unsent records are lost. *Data loss!*
> # this continues to the last source. The source doesn't change over if at
> the last source so the race condition in step 3 never happens
>
> So step 3 results in the race condition that will drop records
> indeterminately for all but the first source and last source.
> In production this issue caused the loss of GBs to TBs of data when a hybrid
> source had the following:
> * 3-5 underlying sources, each of which should emit 100 GB to 10 TB worth of
> records
> * all sources had the same number of splits, around 64-256
> We fixed it in a private fork by clearing the {{finishedReaders}} set when
> changing to the next source.
> Existing tests don't catch this data race because, as far as I understand
> them:
> * use two mock sources, whereas this bug manifests for 3+ sources
> * have sources with parallelism 1, while this bug manifests when the sources
> have parallelism > 1
--
This message was sent by Atlassian Jira
(v8.20.10#820010)