[ 
https://issues.apache.org/jira/browse/FLINK-33001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760601#comment-17760601
 ] 

Martijn Visser commented on FLINK-33001:
----------------------------------------

[~abdul] Can you please verify this with the externalized version of the Kafka 
connector, given that that has bug fixes for some other things as well? See 
https://flink.apache.org/downloads/#apache-flink-kafka-connector-300

> KafkaSource in batch mode failing with exception if topic partition is empty
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-33001
>                 URL: https://issues.apache.org/jira/browse/FLINK-33001
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.12.7, 1.14.6, 1.17.1
>            Reporter: Abdul
>            Priority: Major
>
> If the Kafka topic is empty in Batch mode, there is an exception while 
> processing it. This bug was supposedly fixed but unfortunately, the exception 
> still occurs. The original bug was reported as this 
> https://issues.apache.org/jira/browse/FLINK-27041
> We tried to backport it but it still doesn't work. 
>  * The problem will occur in case of the DEBUG level of logger for class 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
>  * The same problems will occur in other versions of Flink, at least in the 
> 1.15 release branch and tag release-1.15.4
>  * The same problem also occurs in Flink 1.17.1 and 1.14
>  
> The minimal code to produce this is 
>  
> {code:java}
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>               KafkaSource<String> kafkaSource = KafkaSource
>                               .<String>builder()
>                               .setBootstrapServers("localhost:9092")
>                               .setTopics("test_topic")
>                               .setValueOnlyDeserializer(new 
> SimpleStringSchema())
>                               .setBounded(OffsetsInitializer.latest())
>                               .build();
>               DataStream<String> stream = env.fromSource(
>                               kafkaSource,
>                               WatermarkStrategy.noWatermarks(),
>                               "Kafka Source"
>               );
>               stream.print();
>               env.execute("Flink KafkaSource test job"); {code}
> This produces exception: 
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception        at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
>         at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)     
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)        at 
> java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException: 
> SplitFetcher thread 0 received unexpected exception while polling the records 
>        at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)       
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         ... 1 moreCaused by: java.lang.IllegalStateException: You can only 
> check the position for partitions assigned to this consumer.        at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1737)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1704)
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.maybeLogSplitChangesHandlingResult(KafkaPartitionSplitReader.java:375)
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:232)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
>         ... 6 more {code}
>  
> The only *workaround* that works fine right now is to change the DEBUG level 
> to INFO for logging. 
>  
> {code:java}
> logger.KafkaPartitionSplitReader.name = 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader 
> logger.KafkaPartitionSplitReader.level = INFO{code}
> It is strange that changing this doesn't cause the above exception. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to