Hi Paimon Community:

Now that, If the user does not set scan.startup.mode or 
properties.auto.offset.reset, then the default is group-offsets, which causes 
the following exception.

2025-02-12 16:52:37,248 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Kafka Source -> Parse -> Side Output -> (Schema 
Evolution, Case-insensitive Convert -> Writer : orders_table -> Compact 
Coordinator: orders_table) (1/1)#0 
(a8f049d07ffb31c14b67745b764bf8ef_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: One or more fetchers have encountered exception
      at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
 ~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
 ~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
 ~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) 
~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 ~[flink-dist-1.19.1.jar:1.19.1]
      at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
[flink-dist-1.19.1.jar:1.19.1]
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) 
[flink-dist-1.19.1.jar:1.19.1]
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
[flink-dist-1.19.1.jar:1.19.1]
      at java.lang.Thread.run(Thread.java:750) [?:1.8.0_432]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
      at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_432]
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432]
      at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_432]
      at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_432]
      ... 1 more
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
 Undefined offset with no reset policy for partitions: [test_kafka_offset-0]
      at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:712)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2461)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1758)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.lambda$removeEmptySplits$4(KafkaPartitionSplitReader.java:375)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.retryOnWakeup(KafkaPartitionSplitReader.java:481)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:374)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:224)
 ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19]
      at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
 ~[flink-connector-files-1.19.1.jar:1.19.1]
      at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_432]
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432]
      at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_432]
      at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_432]
      ... 1 more

But we expect the following:

Situation 1
If the user does not set --kafka_conf scan.startup.mode, then earliest-offset 
should be used by default.

Situation 2
If the user does not set --kafka_conf properties.auto.offset.reset, then 
earliest(This is kafka default value) should be used by default.



Best,
Jeff Yang

Reply via email to