[
https://issues.apache.org/jira/browse/FLINK-31006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17696257#comment-17696257
]
Ran Tao edited comment on FLINK-31006 at 3/3/23 6:18 PM:
---------------------------------------------------------
[~syhily] [~renqs] hi, guys. I think Qingsheng's respond in
github([https://github.com/apache/flink/pull/21909).] is right. I got another
problem and found this case.
1.Kafka source runs in bounded mode.
2.The enumerator starts and noMoreNewPartitionSplits is set to true, then the
initial partition discovery is triggered.
3.As the partition discovery runs asynchronously in the worker thread, it's
possible that a reader can register on the enumerator before the partition
discovery finishes.
If we set `noMoreNewPartitionSplits = true;` when partitionDiscovery is
disabled.
because *context.callAsync* to get partitions is a async call. If call is not
finished, it will quit early. it means we can not consume bounded source.
However we expect it should read first partitions and consume and then quit.
so, i think currently the PR can not work.
was (Author: lemonjing):
[~syhily] [~renqs] hi, guys. I think Qingsheng's respond in
github([https://github.com/apache/flink/pull/21909).] is right. I got another
thing and found this case.
1.Kafka source runs in bounded mode.
2.The enumerator starts and noMoreNewPartitionSplits is set to true, then the
initial partition discovery is triggered.
3.As the partition discovery runs asynchronously in the worker thread, it's
possible that a reader can register on the enumerator before the partition
discovery finishes.
If we set `noMoreNewPartitionSplits = true;` when partitionDiscovery is
disabled.
because *context.callAsync* to get partitions is a async call. If call is not
finished, it will quit early. it means we can not consume bounded source.
However we expect it should read first partitions and consume and then quit.
so, i think currently the PR can not work.
> job is not finished when using pipeline mode to run bounded source like
> kafka/pulsar
> ------------------------------------------------------------------------------------
>
> Key: FLINK-31006
> URL: https://issues.apache.org/jira/browse/FLINK-31006
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Connectors / Pulsar
> Affects Versions: 1.17.0
> Reporter: jackylau
> Assignee: jackylau
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2023-02-10-13-20-52-890.png,
> image-2023-02-10-13-23-38-430.png, image-2023-02-10-13-24-46-929.png,
> image-2023-03-04-01-04-18-658.png, image-2023-03-04-01-05-25-335.png,
> image-2023-03-04-01-07-04-927.png, image-2023-03-04-01-07-36-168.png,
> image-2023-03-04-01-08-29-042.png, image-2023-03-04-01-09-24-199.png
>
>
> when i do failover works like kill jm/tm when using pipeline mode to run
> bounded source like kafka, i found job is not finished, when every partition
> data has consumed.
>
> After dig into code, i found this logical not run when JM recover. the
> partition infos are not changed. so noMoreNewPartitionSplits is not set to
> true. then this will not run
>
> !image-2023-02-10-13-23-38-430.png!
>
> !image-2023-02-10-13-24-46-929.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)