[
https://issues.apache.org/jira/browse/FLINK-30334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-30334:
----------------------------
Description:
If we use hybrid source, for example, filesystem source A read a.csv,
filesystem B read b.csv. It's a very simple case, but it will hang in second
source with:
10802 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
Source: hybrid_source[1] received split request from parallel task 0 (#0)
10802 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask
0 (on host '') is requesting a file source split
10803 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] -
Assigning split to non-localized request: Optional[FileSourceSplit:
[file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info)
ID=0000000001 position=null]
10808 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] -
Assigned split to subtask 0 : FileSourceSplit:
[file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info)
ID=0000000001 position=null
10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding
splits subtask=0 sourceIndex=0
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
[HybridSourceSplit\\{sourceIndex=0, splitId=0000000001}]
10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
split(s) to reader: [FileSourceSplit:
[file:/Users/xxx/a.csv|file:///Users/chucheng/TMP/a.csv] [0, 49) (no host info)
ID=0000000001 position=null]
10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
(1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 0
10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
(1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished reading from splits [0000000001]
+I[hello_a, flink, 1]
+I[hello_a, hadoop, 2]
+I[hello_a, world, 3]
10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished
reading split(s) [0000000001]
10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] -
Closing splitFetcher 0 because it is idle.
10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Shutting down split fetcher 0
10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
(1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split
fetcher 0 exited.
10869 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
Source: hybrid_source[1] received split request from parallel task 0 (#0)
10870 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask
0 (on host '') is requesting a file source split
10872 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more
splits available for subtask 0
10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader
received NoMoreSplits event.
10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of
input subtask=0 sourceIndex=0
org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9
10874 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] -
Starting enumerator for sourceIndex=1
10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch
source event: subtask=0 sourceIndex=1
source=org.apache.flink.connector.file.src.FileSource@12ef574f
10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing
Source Reader.
10882 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
Source: hybrid_source[1] received split request from parallel task 0 (#0)
*do not read next source data and hang at 'received split request from parallel
task'*
The reason is that in the 1.16 & master, latest code add
context.hasNoMoreSplits check than call enumerator.handleSplitRequest. We do
understand the comments for reducing the call splits. But it not consider the
the situation about HybridSource. When a subtask hasNoMoreSplits, it will
switch to next source. But here just set a check without this situation. When
first source read finish, the context just let this subtask with noMoreSplit
Status. And the later check can't assign splits with next sources. However ,
the flink 1.15 is correct.
*SourceCoordinator*
{code:java}
private void handleRequestSplitEvent(int subtask, int attemptNumber,
RequestSplitEvent event) {
LOG.info(
"Source {} received split request from parallel task {} (#{})",
operatorName,
subtask,
attemptNumber);
// request splits from the enumerator only if the enumerator has
un-assigned splits
// this helps to reduce unnecessary split requests to the enumerator
if (!context.hasNoMoreSplits(subtask)) {
enumerator.handleSplitRequest(subtask, event.hostName());
}
} {code}
SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not
deal with hybrid the other child sources.
SourceCoordinatorContext
{code:java}
boolean hasNoMoreSplits(int subtaskIndex) { return
subtaskHasNoMoreSplits[subtaskIndex]; }
@Override public void signalNoMoreSplits(int subtask) {
checkSubtaskIndex(subtask); // Ensure the split assignment is done by the
coordinator executor. callInCoordinatorThread( () -> {
subtaskHasNoMoreSplits[subtask] = true; signalNoMoreSplitsToAttempts(subtask);
return null; // void return value }, "Failed to send 'NoMoreSplits' to reader "
+ subtask); } {code}
context set noreMoreSplit true if source is done (without considering the
hybrid situation).
1.15
{code:java}
public void handleEventFromOperator(int subtask, OperatorEvent event) {
runInEventLoop(
() -> {
if (event instanceof RequestSplitEvent) {
LOG.info(
"Source {} received split request from parallel
task {}",
operatorName,
subtask);
enumerator.handleSplitRequest(
subtask, ((RequestSplitEvent) event).hostName());
} {code}
was:
If we use hybrid source, for example, filesystem source A read a.csv,
filesystem B read b.csv. It's a very simple case, but it will hang in second
source with:
10802 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
Source: hybrid_source[1] received split request from parallel task 0 (#0)
10802 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask
0 (on host '') is requesting a file source split
10803 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] -
Assigning split to non-localized request: Optional[FileSourceSplit:
file:/Users/xxx/a.csv [0, 49) (no host info) ID=0000000001 position=null]
10808 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] -
Assigned split to subtask 0 : FileSourceSplit: file:/Users/xxx/a.csv [0, 49)
(no host info) ID=0000000001 position=null
10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding
splits subtask=0 sourceIndex=0
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
[HybridSourceSplit\{sourceIndex=0, splitId=0000000001}]
10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
split(s) to reader: [FileSourceSplit: file:/Users/chucheng/TMP/a.csv [0, 49)
(no host info) ID=0000000001 position=null]
10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
(1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 0
10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
(1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished reading from splits [0000000001]
+I[hello_a, flink, 1]
+I[hello_a, hadoop, 2]
+I[hello_a, world, 3]
10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished
reading split(s) [0000000001]
10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] -
Closing splitFetcher 0 because it is idle.
10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Shutting down split fetcher 0
10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
(1/1)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split
fetcher 0 exited.
10869 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
Source: hybrid_source[1] received split request from parallel task 0 (#0)
10870 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask
0 (on host '') is requesting a file source split
10872 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more
splits available for subtask 0
10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader
received NoMoreSplits event.
10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of
input subtask=0 sourceIndex=0
org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9
10874 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] -
Starting enumerator for sourceIndex=1
10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch
source event: subtask=0 sourceIndex=1
source=org.apache.flink.connector.file.src.FileSource@12ef574f
10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing
Source Reader.
10882 [SourceCoordinator-Source: hybrid_source[1]] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
Source: hybrid_source[1] received split request from parallel task 0 (#0)
*do not read next source data and hang at 'received split request from parallel
task'*
The reason is that in the 1.16 & master, latest code add
context.hasNoMoreSplits check than call enumerator.handleSplitRequest. We do
understand the comments for reducing the call splits. But it not consider the
the situation about HybridSource. When a subtask hasNoMoreSplits, it will
switch to next source. But here just set a check without this situation. When
first source read finish, the context just let this subtask with noMoreSplit
Status. And the later check can't assign splits with next sources. However ,
the flink 1.15 is correct.
*SourceCoordinator*
{code:java}
private void handleRequestSplitEvent(int subtask, int attemptNumber,
RequestSplitEvent event) {
LOG.info(
"Source {} received split request from parallel task {} (#{})",
operatorName,
subtask,
attemptNumber);
// request splits from the enumerator only if the enumerator has
un-assigned splits
// this helps to reduce unnecessary split requests to the enumerator
if (!context.hasNoMoreSplits(subtask)) {
enumerator.handleSplitRequest(subtask, event.hostName());
}
} {code}
SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not
deal with hybrid the other child sources.
SourceCoordinatorContext
{code:java}
boolean hasNoMoreSplits(int subtaskIndex) { return
subtaskHasNoMoreSplits[subtaskIndex]; }
@Override public void signalNoMoreSplits(int subtask) {
checkSubtaskIndex(subtask); // Ensure the split assignment is done by the
coordinator executor. callInCoordinatorThread( () -> {
subtaskHasNoMoreSplits[subtask] = true; signalNoMoreSplitsToAttempts(subtask);
return null; // void return value }, "Failed to send 'NoMoreSplits' to reader "
+ subtask); } {code}
context set noreMoreSplit true if source is done (without considering the
hybrid situation).
1.15
{code:java}
public void handleEventFromOperator(int subtask, OperatorEvent event) {
runInEventLoop(
() -> {
if (event instanceof RequestSplitEvent) {
LOG.info(
"Source {} received split request from parallel
task {}",
operatorName,
subtask);
enumerator.handleSplitRequest(
subtask, ((RequestSplitEvent) event).hostName());
} {code}
> SourceCoordinator error splitRequest check cause HybridSource hang
> ------------------------------------------------------------------
>
> Key: FLINK-30334
> URL: https://issues.apache.org/jira/browse/FLINK-30334
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.16.0, 1.17.0
> Reporter: Ran Tao
> Priority: Critical
>
> If we use hybrid source, for example, filesystem source A read a.csv,
> filesystem B read b.csv. It's a very simple case, but it will hang in second
> source with:
> 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] -
> Subtask 0 (on host '') is requesting a file source split
> 10803 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] -
> Assigning split to non-localized request: Optional[FileSourceSplit:
> [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info)
> ID=0000000001 position=null]
> 10808 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] -
> Assigned split to subtask 0 : FileSourceSplit:
> [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info)
> ID=0000000001 position=null
> 10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding
> splits subtask=0 sourceIndex=0
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
> [HybridSourceSplit\\{sourceIndex=0, splitId=0000000001}]
> 10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
> split(s) to reader: [FileSourceSplit:
> [file:/Users/xxx/a.csv|file:///Users/chucheng/TMP/a.csv] [0, 49) (no host
> info) ID=0000000001 position=null]
> 10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
> (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Starting split fetcher 0
> 10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
> (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished reading from splits [0000000001]
> +I[hello_a, flink, 1]
> +I[hello_a, hadoop, 2]
> +I[hello_a, world, 3]
> 10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished
> reading split(s) [0000000001]
> 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager []
> - Closing splitFetcher 0 because it is idle.
> 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Shutting down split fetcher 0
> 10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2]
> (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split
> fetcher 0 exited.
> 10869 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> 10870 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] -
> Subtask 0 (on host '') is requesting a file source split
> 10872 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No
> more splits available for subtask 0
> 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader
> received NoMoreSplits event.
> 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of
> input subtask=0 sourceIndex=0
> org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
> StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9
> 10874 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator []
> - Starting enumerator for sourceIndex=1
> 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch
> source event: subtask=0 sourceIndex=1
> source=org.apache.flink.connector.file.src.FileSource@12ef574f
> 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing
> Source Reader.
> 10882 [SourceCoordinator-Source: hybrid_source[1]] INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> *do not read next source data and hang at 'received split request from
> parallel task'*
>
> The reason is that in the 1.16 & master, latest code add
> context.hasNoMoreSplits check than call enumerator.handleSplitRequest. We do
> understand the comments for reducing the call splits. But it not consider the
> the situation about HybridSource. When a subtask hasNoMoreSplits, it will
> switch to next source. But here just set a check without this situation. When
> first source read finish, the context just let this subtask with noMoreSplit
> Status. And the later check can't assign splits with next sources. However ,
> the flink 1.15 is correct.
>
> *SourceCoordinator*
>
> {code:java}
> private void handleRequestSplitEvent(int subtask, int attemptNumber,
> RequestSplitEvent event) {
> LOG.info(
> "Source {} received split request from parallel task {} (#{})",
> operatorName,
> subtask,
> attemptNumber);
> // request splits from the enumerator only if the enumerator has
> un-assigned splits
> // this helps to reduce unnecessary split requests to the enumerator
> if (!context.hasNoMoreSplits(subtask)) {
> enumerator.handleSplitRequest(subtask, event.hostName());
> }
> } {code}
> SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not
> deal with hybrid the other child sources.
>
> SourceCoordinatorContext
>
> {code:java}
> boolean hasNoMoreSplits(int subtaskIndex) { return
> subtaskHasNoMoreSplits[subtaskIndex]; }
> @Override public void signalNoMoreSplits(int subtask) {
> checkSubtaskIndex(subtask); // Ensure the split assignment is done by the
> coordinator executor. callInCoordinatorThread( () -> {
> subtaskHasNoMoreSplits[subtask] = true;
> signalNoMoreSplitsToAttempts(subtask); return null; // void return value },
> "Failed to send 'NoMoreSplits' to reader " + subtask); } {code}
> context set noreMoreSplit true if source is done (without considering the
> hybrid situation).
>
>
> 1.15
>
> {code:java}
> public void handleEventFromOperator(int subtask, OperatorEvent event) {
> runInEventLoop(
> () -> {
> if (event instanceof RequestSplitEvent) {
> LOG.info(
> "Source {} received split request from parallel
> task {}",
> operatorName,
> subtask);
> enumerator.handleSplitRequest(
> subtask, ((RequestSplitEvent) event).hostName());
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)