[
https://issues.apache.org/jira/browse/FLINK-33001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760601#comment-17760601
]
Martijn Visser commented on FLINK-33001:
----------------------------------------
[~abdul] Can you please verify this with the externalized version of the Kafka
connector, given that that has bug fixes for some other things as well? See
https://flink.apache.org/downloads/#apache-flink-kafka-connector-300
> KafkaSource in batch mode failing with exception if topic partition is empty
> ----------------------------------------------------------------------------
>
> Key: FLINK-33001
> URL: https://issues.apache.org/jira/browse/FLINK-33001
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.12.7, 1.14.6, 1.17.1
> Reporter: Abdul
> Priority: Major
>
> If the Kafka topic is empty in Batch mode, there is an exception while
> processing it. This bug was supposedly fixed but unfortunately, the exception
> still occurs. The original bug was reported as this
> https://issues.apache.org/jira/browse/FLINK-27041
> We tried to backport it but it still doesn't work.
> * The problem will occur in case of the DEBUG level of logger for class
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
> * The same problems will occur in other versions of Flink, at least in the
> 1.15 release branch and tag release-1.15.4
> * The same problem also occurs in Flink 1.17.1 and 1.14
>
> The minimal code to produce this is
>
> {code:java}
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> KafkaSource<String> kafkaSource = KafkaSource
> .<String>builder()
> .setBootstrapServers("localhost:9092")
> .setTopics("test_topic")
> .setValueOnlyDeserializer(new
> SimpleStringSchema())
> .setBounded(OffsetsInitializer.latest())
> .build();
> DataStream<String> stream = env.fromSource(
> kafkaSource,
> WatermarkStrategy.noWatermarks(),
> "Kafka Source"
> );
> stream.print();
> env.execute("Flink KafkaSource test job"); {code}
> This produces exception:
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered
> exception at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) at
> java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException:
> SplitFetcher thread 0 received unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 moreCaused by: java.lang.IllegalStateException: You can only
> check the position for partitions assigned to this consumer. at
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1737)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1704)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.maybeLogSplitChangesHandlingResult(KafkaPartitionSplitReader.java:375)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:232)
> at
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
> ... 6 more {code}
>
> The only *workaround* that works fine right now is to change the DEBUG level
> to INFO for logging.
>
> {code:java}
> logger.KafkaPartitionSplitReader.name =
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
> logger.KafkaPartitionSplitReader.level = INFO{code}
> It is strange that changing this doesn't cause the above exception.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)