[
https://issues.apache.org/jira/browse/FLINK-26938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515113#comment-17515113
]
Yuan Zhu commented on FLINK-26938:
----------------------------------
I have reproduce it locally.
I found that switchedSources in HybridSourceSplitEnumerator only add lastest
source when it start().
But if flink parallelism is greater than the number of Kafka partitions, there
will be a circumstance that currentSourceIndex of some source readers is 0,
others are 1. So when the reader restore from savepoint(the real current index
is 1 in HybridSourceSplitEnumerator), it will fetch source from switchedSources
in HybridSourceSplitEnumerator. It will cause NPE due to the absence of source
0.
> HybridSource recovery from savepoint fails When flink parallelism is greater
> than the number of Kafka partitions
> ----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26938
> URL: https://issues.apache.org/jira/browse/FLINK-26938
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.14.0
> Environment: Flink 1.14.0
> Reporter: 文报
> Priority: Major
> Attachments: HybridSourceTest.java, image-2022-03-31-13-36-45-686.png
>
>
> HybridSource recovery from savepoint fails When flink parallelism is greater
> than the number of Kafka partitions
> First test
> Flink job before savePoint
> flink parallelism =16
> kafka partition=3
> Flink after savePoint
> case 1:
> flink parallelism =16
> kafka partition=3
> HybridSource recovery from savepoint fails
> !image-2022-03-31-13-36-45-686.png!
> case 2:
> flink parallelism =3
> kafka partition=3
> HybridSource recovery from savepoint successful
> case 3:
> flink parallelism =8
> kafka partition=3
> HybridSource recovery from savepoint fails the same NullPointerException:
> Source for index=0 not available
> case 4:
> flink parallelism =4
> kafka partition=3
> HybridSource recovery from savepoint fails the same NullPointerException:
> Source for index=0 not available
> case 5:
> flink parallelism =1
> kafka partition=3
> HybridSource recovery from savepoint successful
> Second test
> Flink job before savePoint
> flink parallelism =3
> kafka partition=3
> Flink after savePoint
> case 1:
> flink parallelism =3
> kafka partition=3
> HybridSource recovery from savepoint successful
> case 2:
> flink parallelism =1
> kafka partition=3
> HybridSource recovery from savepoint successful
> case 3:
> flink parallelism =4
> kafka partition=3
> HybridSource recovery from savepoint fails the same NullPointerException:
> Source for index=0 not available
> Specific code see the attached test code HybridSourceTest
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)