[ 
https://issues.apache.org/jira/browse/FLINK-30334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-30334:
----------------------------
    Fix Version/s: 1.16.1

> SourceCoordinator error splitRequest check cause HybridSource loss of data 
> and hang
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-30334
>                 URL: https://issues.apache.org/jira/browse/FLINK-30334
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Ran Tao
>            Assignee: Ran Tao
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.16.1
>
>
> If we use hybrid source, for example, filesystem source A read a.csv, 
> filesystem B read b.csv. It's a very simple case, but it will hang in second 
> source with:
> 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - 
> Subtask 0 (on host '') is requesting a file source split
> 10803 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - 
> Assigning split to non-localized request: Optional[FileSourceSplit: 
> [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) 
> ID=0000000001 position=null]
> 10808 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - 
> Assigned split to subtask 0 : FileSourceSplit: 
> [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) 
> ID=0000000001 position=null
> 10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding 
> splits subtask=0 sourceIndex=0 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
>  [HybridSourceSplit
> {sourceIndex=0, splitId=0000000001}
> ]
> 10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [FileSourceSplit: 
> [file:/Users/xxx/a.csv|file:///Users/chucheng/TMP/a.csv] [0, 49) (no host 
> info) ID=0000000001 position=null]
> 10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
> (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Starting split fetcher 0
> 10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
> (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished reading from splits [0000000001]
> +I[hello_a, flink, 1]
> +I[hello_a, hadoop, 2]
> +I[hello_a, world, 3]
> 10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
> reading split(s) [0000000001]
> 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Closing splitFetcher 0 because it is idle.
> 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Shutting down split fetcher 0
> 10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
> (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
> fetcher 0 exited.
> 10869 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> 10870 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - 
> Subtask 0 (on host '') is requesting a file source split
> 10872 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No 
> more splits available for subtask 0
> 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader 
> received NoMoreSplits event.
> 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of 
> input subtask=0 sourceIndex=0 
> org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
> StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9
> 10874 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] 
> - Starting enumerator for sourceIndex=1
> 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch 
> source event: subtask=0 sourceIndex=1 
> source=org.apache.flink.connector.file.src.FileSource@12ef574f
> 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
> Source Reader.
> 10882 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> *do not read next source data and hang at 'received split request from 
> parallel task'*
>  
> The reason is that in the 1.16 & master, latest code add 
> context.hasNoMoreSplits check then call enumerator.handleSplitRequest.  We do 
> understand the comments for reducing the call splits. But it not consider the 
> the situation about HybridSource. When a subtask hasNoMoreSplits, it will 
> switch to next source. But here just set a check without this situation. When 
> first source read finish, the context just let this subtask with noMoreSplit 
> Status. And the later check can't assign splits with next sources. However , 
> the flink 1.15 is correct.
>  
> *SourceCoordinator*
>  
> {code:java}
> private void handleRequestSplitEvent(int subtask, int attemptNumber, 
> RequestSplitEvent event) {
>     LOG.info(
>             "Source {} received split request from parallel task {} (#{})",
>             operatorName,
>             subtask,
>             attemptNumber);
>     // request splits from the enumerator only if the enumerator has 
> un-assigned splits
>     // this helps to reduce unnecessary split requests to the enumerator
>     if (!context.hasNoMoreSplits(subtask)) {
>         enumerator.handleSplitRequest(subtask, event.hostName());
>     }
> } {code}
> SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not 
> read the other child sources in hybrid source.
>  
> SourceCoordinatorContext
>  
> {code:java}
> boolean hasNoMoreSplits(int subtaskIndex) { return 
> subtaskHasNoMoreSplits[subtaskIndex]; }
> @Override
> public void signalNoMoreSplits(int subtask) {
>     checkSubtaskIndex(subtask);
>     // Ensure the split assignment is done by the coordinator executor.
>     callInCoordinatorThread(
>             () -> {
>                 subtaskHasNoMoreSplits[subtask] = true;
>                 signalNoMoreSplitsToAttempts(subtask);
>                 return null; // void return value
>             },
>             "Failed to send 'NoMoreSplits' to reader " + subtask);
> }
> {code}
> context set subtask noMoreSplit is true if source is done (without 
> considering the hybrid situation).
>  
>  
> 1.15
>  
> {code:java}
> public void handleEventFromOperator(int subtask, OperatorEvent event) {
>     runInEventLoop(
>             () -> {
>                 if (event instanceof RequestSplitEvent) {
>                     LOG.info(
>                             "Source {} received split request from parallel 
> task {}",
>                             operatorName,
>                             subtask);
>                     enumerator.handleSplitRequest(
>                             subtask, ((RequestSplitEvent) event).hostName());
>                 }  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to