[
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Varun Narayanan Chakravarthy updated FLINK-33402:
-------------------------------------------------
Description:
Hello Team,
I noticed that there is data loss when using Hybrid Source. We are reading from
a series of concrete File Sources ~100. All these locations are chained
together using the Hybrid source.
The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid
Sources switches the next source before the current source is complete.
Similarly for the Hybrid Source readers. I have also shared the patch file that
fixes the issue.
>From the logs:
*Task Manager logs:*
2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO
o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding split(s)
to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)
hosts=[localhost] ID=0000000229 position=null] 2023-10-10 17:46:23.715 [Source
Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO
org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy
2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source
(1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to
Random IO seek policy 2023-10-10 17:46:24.012 [Source: parquet-source
(1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase
- Finished reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data
Fetcher for Source: parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished reading
from splits [0000000154] 2023-10-10 17:46:24.014 [Source: parquet-source
(1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase
- Reader received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source:
parquet-source (1/2)#0|#0] DEBUG
o.a.flink.connector.base.source.hybrid.HybridSourceReader - No more splits for
subtask=0 sourceIndex=11
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source
(1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to
Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for Source:
parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream -
Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source:
parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source
event: subtask=0 sourceIndex=12
source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10
17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing Source
Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down
split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source:
parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher 0
exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG
o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed:
subtask=0 sourceIndex=11
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.
This is assigned to Reader with ID 0000000229. Now, we can see from the logs
this split is added after the no-more splits event and is NOT read.
*Job Manager logs:*
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split
to requesting host '10': Optional[FileSourceSplit:
s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229
position=null]
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)
hosts=[localhost] ID=0000000229 position=null
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split
to requesting host '10': Optional[FileSourceSplit:
s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045
position=null]
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)
hosts=[localhost] ID=0000000045 position=null
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received split request from parallel task 0 (#0)
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
subtask=0 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on
host '10.4.192.125') is requesting a file source split
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
available for subtask 0
2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
available for subtask 1
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received custom event from parallel task 1 (#0):
SourceReaderFinishedEvent\{sourceIndex=11}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent
SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers finished,
ready to switch enumerator!
The assigned split is never processed.
I traced the race conditions bug to the HybridSourceSplitEnumerator and
HybridSourceSplitReader.
There are race in both the source and the reader side. The attached patch
ensures that the switch from one source to another and one reader to another
happen in an atomic fashion with respect to the rest of the code. All section
of the code that use the currentReader or currentEnumerator are read-locked and
the code for reader/enumerator switch is written lock. This ensures that no
other function is executed when the switch for reader/enumerator occurs.
Applying just the fixes to HybridSourceSplitEnumerator will resolve the
majority of the data loss but not all. But, for complete correctness fixes are
needed in both locations. Additionally, current readers also needs to be reset
before proceeding.
With these fixes applied, our team using Flink, at scale of 1B+ records/hour
with 180 Task Managers, did not see any data loss issue. There was also no
noticeable impact on performance due to the read-write mutexes and concurrency
control.
Additonally, integer comparision of objects needs to use `equals` otherwise it
won't work above 128. This
[issue|https://www.mail-archive.com/[email protected]/msg647008.html] has
been reported before, by another user.
If the above fixes are valid, please let me know. I would be happy to create a
branch and PR against the repo. I have completed and signed the individual CLA
and will be emailing it soon.
was:
Hello Team,
I noticed that there is data loss when using Hybrid Source. We are reading from
a series of concrete File Sources ~100. All these locations are chained
together using the Hybrid source.
The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid
Sources switches the next source before the current source is complete.
Similarly for the Hybrid Source readers. I have also shared the patch file that
fixes the issue.
>From the logs:
*Task Manager logs:*
2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO
o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding split(s)
to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)
hosts=[localhost] ID=0000000229 position=null] 2023-10-10 17:46:23.715 [Source
Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO
org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy
2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source
(1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to
Random IO seek policy 2023-10-10 17:46:24.012 [Source: parquet-source
(1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase
- Finished reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data
Fetcher for Source: parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished reading
from splits [0000000154] 2023-10-10 17:46:24.014 [Source: parquet-source
(1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase
- Reader received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source:
parquet-source (1/2)#0|#0] DEBUG
o.a.flink.connector.base.source.hybrid.HybridSourceReader - No more splits for
subtask=0 sourceIndex=11
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source
(1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to
Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for Source:
parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream -
Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source:
parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source
event: subtask=0 sourceIndex=12
source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10
17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing Source
Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down
split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source:
parquet-source (1/2)#0|#0] INFO
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher 0
exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG
o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed:
subtask=0 sourceIndex=11
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.
This is assigned to Reader with ID 0000000229. Now, we can see from the logs
this split is added after the no-more splits event and is NOT read.
*Job Manager logs:*
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split
to requesting host '10': Optional[FileSourceSplit:
s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229
position=null]
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)
hosts=[localhost] ID=0000000229 position=null
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split
to requesting host '10': Optional[FileSourceSplit:
s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045
position=null]
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)
hosts=[localhost] ID=0000000045 position=null
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received split request from parallel task 0 (#0)
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
subtask=0 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on
host '10.4.192.125') is requesting a file source split
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
available for subtask 0
2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
available for subtask 1
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
parquet-source received custom event from parallel task 1 (#0):
SourceReaderFinishedEvent\{sourceIndex=11}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent
SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers finished,
ready to switch enumerator!
The assigned split is never processed.
I traced the race conditions bug to the HybridSourceSplitEnumerator and
HybridSourceSplitReader.
There are race in both the source and the reader side. The attached patch
ensures that the switch from one source to another and one reader to another
happen in an atomic fashion with respect to the rest of the code. All section
of the code that use the currentReader or currentEnumerator are read-locked and
the code for reader/enumerator switch is written lock. This ensures that no
other function is executed when the switch for reader/enumerator occurs.
Applying just the fixes to HybridSourceSplitEnumerator will resolve the
majority of the data loss but not all. But, for complete correctness fixes are
needed in both locations.
With these fixes applied, our team using Flink, at scale of 1B+ records/hour
with 180 Task Managers, did not see any data loss issue. There was also no
noticeable impact on performance due to the read-write mutexes and concurrency
control.
Additonally, integer comparision of objects needs to use `equals` otherwise it
won't work above 128. This
[issue|https://www.mail-archive.com/[email protected]/msg647008.html] has
been reported before, by another user.
If the above fixes are valid, please let me know. I would be happy to create a
branch and PR against the repo. I have completed and signed the individual CLA
and will be emailing it soon.
> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in
> Data Loss
> ------------------------------------------------------------------------------------
>
> Key: FLINK-33402
> URL: https://issues.apache.org/jira/browse/FLINK-33402
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc.
> Reporter: Varun Narayanan Chakravarthy
> Priority: Blocker
> Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading
> from a series of concrete File Sources ~100. All these locations are chained
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid
> Sources switches the next source before the current source is complete.
> Similarly for the Hybrid Source readers. I have also shared the patch file
> that fixes the issue.
> From the logs:
> *Task Manager logs:*
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0,
> 94451) hosts=[localhost] ID=0000000229 position=null] 2023-10-10
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO
> org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source:
> parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream -
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source:
> parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
> reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher
> for Source: parquet-source (1/2)#0|#0] INFO
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished
> reading from splits [0000000154] 2023-10-10 17:46:24.014 [Source:
> parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader
> - No more splits for subtask=0 sourceIndex=11
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source
> (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for
> Source: parquet-source (1/2)#0|#0] INFO
> org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
> o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source
> event: subtask=0 sourceIndex=12
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO
> o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0]
> INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source:
> parquet-source (1/2)#0|#0] INFO
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG
> o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed:
> subtask=0 sourceIndex=11
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.
> This is assigned to Reader with ID 0000000229. Now, we can see from the logs
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote
> split to requesting host '10': Optional[FileSourceSplit:
> s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
> to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)
> hosts=[localhost] ID=0000000229 position=null
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote
> split to requesting host '10': Optional[FileSourceSplit:
> s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045
> position=null]
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split
> to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)
> hosts=[localhost] ID=0000000045 position=null
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received split request from parallel task 0 (#0)
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
> subtask=0 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on
> host '10.4.192.125') is requesting a file source split
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
> available for subtask 0
> 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits
> available for subtask 1
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
> o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source:
> parquet-source received custom event from parallel task 1 (#0):
> SourceReaderFinishedEvent\{sourceIndex=11}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent
> SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers
> finished, ready to switch enumerator!
> The assigned split is never processed.
> I traced the race conditions bug to the HybridSourceSplitEnumerator and
> HybridSourceSplitReader.
> There are race in both the source and the reader side. The attached patch
> ensures that the switch from one source to another and one reader to another
> happen in an atomic fashion with respect to the rest of the code. All section
> of the code that use the currentReader or currentEnumerator are read-locked
> and the code for reader/enumerator switch is written lock. This ensures that
> no other function is executed when the switch for reader/enumerator occurs.
> Applying just the fixes to HybridSourceSplitEnumerator will resolve the
> majority of the data loss but not all. But, for complete correctness fixes
> are needed in both locations. Additionally, current readers also needs to be
> reset before proceeding.
> With these fixes applied, our team using Flink, at scale of 1B+ records/hour
> with 180 Task Managers, did not see any data loss issue. There was also no
> noticeable impact on performance due to the read-write mutexes and
> concurrency control.
> Additonally, integer comparision of objects needs to use `equals` otherwise
> it won't work above 128. This
> [issue|https://www.mail-archive.com/[email protected]/msg647008.html]
> has been reported before, by another user.
> If the above fixes are valid, please let me know. I would be happy to create
> a branch and PR against the repo. I have completed and signed the individual
> CLA and will be emailing it soon.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)