[ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771610#comment-17771610
 ] 

Martijn Visser commented on FLINK-33170:
----------------------------------------

[~robertjira] Have a look at 
https://flink.apache.org/how-to-contribute/overview/ - If you run into issues, 
let me know

> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-33170
>                 URL: https://issues.apache.org/jira/browse/FLINK-33170
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>            Reporter: Robert Hoyt
>            Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
> Priority labeled critical because this issue can cause major data loss, in 
> our experience order GBs to TBs.
>  
> In all versions since 1.15.x there's a subtle bug in 
> {{HybridSourceSplitEnumerator}} when determining if it's time to move on to 
> the next source:
> {code:java}
> finishedReaders.add(subtaskId);
> if (finishedReaders.size() == context.currentParallelism()) {
>   // move on to the next source if it exists
> {code}
> This snippet is correct, but when changing to the next source, 
> {{finishedReaders}} is never cleared. So when processing the second source, 
> the {{finishedReaders.size()}} check will return true when the _first_ 
> subtask finishes.** The hybrid source moves on to the next source if one 
> exists, so any records remaining to be read and sent in the other 
> {{numSubtasks - 1}} subtasks will get dropped.
>  
> {{**}} if each of the sources in the hybrid source has the same parallelism. 
> If any source except the last has lower parallelism then I suspect that the 
> source will never move on: it's impossible for {{finishedReaders.size()}} to 
> shrink.
>  
> Concrete example with three sources, two subtasks each:
>  # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
>  # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
> now, and moves on to the second source
>  # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} 
> doesn't change the set; {{finishedReaders}} still has size 2. So the hybrid 
> source moves on to the third source.
>  # subtask 0 wasn't finished with the second source, but receives the 
> notification to move on. Any unsent records are lost. *Data loss!*
>  # this continues to the last source. The source doesn't change over if at 
> the last source so the race condition in step 3 never happens
>  
> So step 3 results in the race condition that will drop records 
> indeterminately for all but the first source and last source.
> In production this issue caused the loss of GBs to TBs of data when a hybrid 
> source had the following:
>  * 3-5 underlying sources, each of which should emit 100 GB to 10 TB worth of 
> records
>  * all sources had the same number of splits, around 64-256
> We fixed it in a private fork by clearing the {{finishedReaders}} set when 
> changing to the next source.
> Existing tests don't catch this data race because, as far as I understand 
> them:
>  * use two mock sources, whereas this bug manifests for 3+ sources
>  * have sources with parallelism 1, while this bug manifests when the sources 
> have parallelism > 1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to