[ 
https://issues.apache.org/jira/browse/FLINK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403713#comment-17403713
 ] 

Hang Ruan commented on FLINK-22702:
-----------------------------------

This issue is related to https://issues.apache.org/jira/browse/FLINK-22198.

When add splits to _KafkaPartitionSplitReader_, the split reader will delete 
the completed partitions whose offset is over the stopping offset in the 
_removeEmptySplits_ method.  All assigned partitions are deleted here in the 
test case.

The reason is that the timestamp of the test data is more than 7 days earlier 
than the test time. These messages are deleted ,and the beginning offset of the 
partition is set to the end, which causes  the split reader judge the partition 
as a completed partition and delete it.

We should change the timestamp of the test data to null or a recent timestamp.

> KafkaSourceITCase.testRedundantParallelism failed
> -------------------------------------------------
>
>                 Key: FLINK-22702
>                 URL: https://issues.apache.org/jira/browse/FLINK-22702
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0, 1.12.3
>            Reporter: Guowei Ma
>            Assignee: Qingsheng Ren
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18107&view=logs&j=1fc6e7bf-633c-5081-c32a-9dea24b05730&t=80a658d1-f7f6-5d93-2758-53ac19fd5b19&l=6847
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
>       at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
>       at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
>       at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any 
> topics or assigned any partitions
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>       at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
>       ... 6 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to