[
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784240#comment-17784240
]
Varun Narayanan Chakravarthy commented on FLINK-33402:
------------------------------------------------------
Created a PR: https://github.com/apache/flink/pull/23687
> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in
> Data Loss
> ------------------------------------------------------------------------------------
>
> Key: FLINK-33402
> URL: https://issues.apache.org/jira/browse/FLINK-33402
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc.
> Reporter: Varun Narayanan Chakravarthy
> Priority: Critical
> Labels: pull-request-available
> Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading
> from a series of concrete File Sources ~100. All these locations are chained
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid
> Sources switches the next source before the current source is complete.
> Similarly for the Hybrid Source readers. I have also shared the patch file
> that fixes the issue.
> From the logs:
> *Task Manager logs:*
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0,
> 94451) hosts=[localhost] ID=0000000229 position=null] 2023-10-10
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO
> org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source:
> parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream -
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source:
> parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
> reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher
> for Source: parquet-source (1/2)#0|#0] INFO
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished
> reading from splits [0000000154] 2023-10-10 17:46:24.014 [Source:
> parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader
> - No more splits for subtask=0 sourceIndex=11
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source
> (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for
> Source: parquet-source (1/2)#0|#0] INFO
> org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
> o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source
> event: subtask=0 sourceIndex=12
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0]
> INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source:
> parquet-source (1/2)#0|#0] INFO
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG
> o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed:
> subtask=0 sourceIndex=11
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.
> This is assigned to Reader with ID 0000000229. Now, we can see from the logs
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote
> split to requesting host '10': Optional[FileSourceSplit:
> s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
> to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)
> hosts=[localhost] ID=0000000229 position=null
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote
> split to requesting host '10': Optional[FileSourceSplit:
> s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045
> position=null]
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
> to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)
> hosts=[localhost] ID=0000000045 position=null
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received split request from parallel task 0 (#0)
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
> subtask=0 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on
> host '10.4.192.125') is requesting a file source split
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
> available for subtask 0
> 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
> available for subtask 1
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received custom event from parallel task 1 (#0):
> SourceReaderFinishedEvent\{sourceIndex=11}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent
> SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers
> finished, ready to switch enumerator!
> The assigned split is never processed.
> I traced the race conditions bug to the HybridSourceSplitEnumerator and
> HybridSourceSplitReader.
> There are race in both the source and the reader side. The attached patch
> ensures that the switch from one source to another and one reader to another
> happen in an atomic fashion with respect to the rest of the code. All section
> of the code that use the currentReader or currentEnumerator are read-locked
> and the code for reader/enumerator switch is written lock. This ensures that
> no other function is executed when the switch for reader/enumerator occurs.
> Applying just the fixes to HybridSourceSplitEnumerator will resolve the
> majority of the data loss but not all. But, for complete correctness fixes
> are needed in both locations. Additionally, current readers also needs to be
> reset before proceeding.
> With these fixes applied, our team using Flink, at scale of 1B+ records/hour
> with 180 Task Managers, did not see any data loss issue. There was also no
> noticeable impact on performance due to the read-write mutexes and
> concurrency control.
> Additonally, integer comparision of objects needs to use `equals` otherwise
> it won't work above 128. This
> [issue|https://www.mail-archive.com/[email protected]/msg647008.html]
> has been reported before, by another user.
> If the above fixes are valid, please let me know. I would be happy to create
> a branch and PR against the repo. I have completed and signed the individual
> CLA and will be emailing it soon.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)