[ 
https://issues.apache.org/jira/browse/FLINK-33001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Qin updated FLINK-33001:
----------------------------
    Description: 
If the Kafka topic is empty in Batch mode, there is an exception while 
processing it. This bug was supposedly fixed but unfortunately, the exception 
still occurs. The original bug was reported as this 
https://issues.apache.org/jira/browse/FLINK-27041

We tried to backport it but it still doesn't work. 
 * The problem will occur in case of the DEBUG level of logger for class 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
 * The same problems will occur in other versions of Flink, at least in the 
1.15 release branch and tag release-1.15.4
 * The same problem also occurs in Flink 1.17.1 and 1.14

 

The minimal code to produce this is 

 
{code:java}
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.BATCH);

                KafkaSource<String> kafkaSource = KafkaSource
                                .<String>builder()
                                .setBootstrapServers("localhost:9092")
                                .setTopics("test_topic")
                                .setValueOnlyDeserializer(new 
SimpleStringSchema())
                                .setBounded(OffsetsInitializer.latest())
                                .build();

                DataStream<String> stream = env.fromSource(
                                kafkaSource,
                                WatermarkStrategy.noWatermarks(),
                                "Kafka Source"
                );
                stream.print();

                env.execute("Flink KafkaSource test job"); {code}
This produces exception: 
{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583) 
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)        
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)        at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException: 
SplitFetcher thread 0 received unexpected exception while polling the records   
     at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)        
at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
       ... 1 moreCaused by: java.lang.IllegalStateException: You can only check 
the position for partitions assigned to this consumer.        at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1737)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1704)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.maybeLogSplitChangesHandlingResult(KafkaPartitionSplitReader.java:375)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:232)
        at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
        ... 6 more {code}
 

The only *workaround* that works fine right now is to change the DEBUG level to 
INFO for logging. 

 
{code:java}
logger.KafkaPartitionSplitReader.name = 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader 

logger.KafkaPartitionSplitReader.level = INFO{code}
It is strange that changing this doesn't cause the above exception. 

  was:
If the Kafka topic is empty in Batch mode, there is an exception while 
processing it. This bug was supposedly fixed but unfortunately, the exception 
still occurs. The original bug was reported as this 
https://issues.apache.org/jira/browse/FLINK-27041

We tried to backport it but it still doesn't work. 
 * The problem will occur in case of the DEBUG level of logger for class 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
 * The same problems will occur in other versions of Flink, at least in the 
1.15 release branch and tag release-1.15.4
 * The same problem also occurs in Flink 1.7.1 and 1.14

 

The minimal code to produce this is 

 
{code:java}
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.BATCH);

                KafkaSource<String> kafkaSource = KafkaSource
                                .<String>builder()
                                .setBootstrapServers("localhost:9092")
                                .setTopics("test_topic")
                                .setValueOnlyDeserializer(new 
SimpleStringSchema())
                                .setBounded(OffsetsInitializer.latest())
                                .build();

                DataStream<String> stream = env.fromSource(
                                kafkaSource,
                                WatermarkStrategy.noWatermarks(),
                                "Kafka Source"
                );
                stream.print();

                env.execute("Flink KafkaSource test job"); {code}
This produces exception: 
{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583) 
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)        
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)        at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException: 
SplitFetcher thread 0 received unexpected exception while polling the records   
     at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)        
at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
       ... 1 moreCaused by: java.lang.IllegalStateException: You can only check 
the position for partitions assigned to this consumer.        at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1737)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1704)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.maybeLogSplitChangesHandlingResult(KafkaPartitionSplitReader.java:375)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:232)
        at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
        ... 6 more {code}
 

The only *workaround* that works fine right now is to change the DEBUG level to 
INFO for logging. 

 
{code:java}
logger.KafkaPartitionSplitReader.name = 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader 

logger.KafkaPartitionSplitReader.level = INFO{code}
It is strange that changing this doesn't cause the above exception. 


> KafkaSource in batch mode failing with exception if topic partition is empty
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-33001
>                 URL: https://issues.apache.org/jira/browse/FLINK-33001
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.12.7, 1.14.6, 1.17.1
>            Reporter: Abdul
>            Priority: Major
>
> If the Kafka topic is empty in Batch mode, there is an exception while 
> processing it. This bug was supposedly fixed but unfortunately, the exception 
> still occurs. The original bug was reported as this 
> https://issues.apache.org/jira/browse/FLINK-27041
> We tried to backport it but it still doesn't work. 
>  * The problem will occur in case of the DEBUG level of logger for class 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
>  * The same problems will occur in other versions of Flink, at least in the 
> 1.15 release branch and tag release-1.15.4
>  * The same problem also occurs in Flink 1.17.1 and 1.14
>  
> The minimal code to produce this is 
>  
> {code:java}
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>               KafkaSource<String> kafkaSource = KafkaSource
>                               .<String>builder()
>                               .setBootstrapServers("localhost:9092")
>                               .setTopics("test_topic")
>                               .setValueOnlyDeserializer(new 
> SimpleStringSchema())
>                               .setBounded(OffsetsInitializer.latest())
>                               .build();
>               DataStream<String> stream = env.fromSource(
>                               kafkaSource,
>                               WatermarkStrategy.noWatermarks(),
>                               "Kafka Source"
>               );
>               stream.print();
>               env.execute("Flink KafkaSource test job"); {code}
> This produces exception: 
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception        at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
>         at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)     
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)        at 
> java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException: 
> SplitFetcher thread 0 received unexpected exception while polling the records 
>        at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)       
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         ... 1 moreCaused by: java.lang.IllegalStateException: You can only 
> check the position for partitions assigned to this consumer.        at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1737)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1704)
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.maybeLogSplitChangesHandlingResult(KafkaPartitionSplitReader.java:375)
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:232)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
>         ... 6 more {code}
>  
> The only *workaround* that works fine right now is to change the DEBUG level 
> to INFO for logging. 
>  
> {code:java}
> logger.KafkaPartitionSplitReader.name = 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader 
> logger.KafkaPartitionSplitReader.level = INFO{code}
> It is strange that changing this doesn't cause the above exception. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to