[
https://issues.apache.org/jira/browse/FLINK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403713#comment-17403713
]
Hang Ruan commented on FLINK-22702:
-----------------------------------
This issue is related to https://issues.apache.org/jira/browse/FLINK-22198.
When add splits to _KafkaPartitionSplitReader_, the split reader will delete
the completed partitions whose offset is over the stopping offset in the
_removeEmptySplits_ method. All assigned partitions are deleted here in the
test case.
The reason is that the timestamp of the test data is more than 7 days earlier
than the test time. These messages are deleted ,and the beginning offset of the
partition is set to the end, which causes the split reader judge the partition
as a completed partition and delete it.
We should change the timestamp of the test data to null or a recent timestamp.
> KafkaSourceITCase.testRedundantParallelism failed
> -------------------------------------------------
>
> Key: FLINK-22702
> URL: https://issues.apache.org/jira/browse/FLINK-22702
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0, 1.12.3
> Reporter: Guowei Ma
> Assignee: Qingsheng Ren
> Priority: Critical
> Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18107&view=logs&j=1fc6e7bf-633c-5081-c32a-9dea24b05730&t=80a658d1-f7f6-5d93-2758-53ac19fd5b19&l=6847
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered
> exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any
> topics or assigned any partitions
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
> ... 6 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)