Hi Paimon Community: Now that, If the user does not set scan.startup.mode or properties.auto.offset.reset, then the default is group-offsets, which causes the following exception.
2025-02-12 16:52:37,248 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Kafka Source -> Parse -> Side Output -> (Schema Evolution, Case-insensitive Convert -> Writer : orders_table -> Compact Coordinator: orders_table) (1/1)#0 (a8f049d07ffb31c14b67745b764bf8ef_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) ~[flink-connector-files-1.19.1.jar:1.19.1] at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) ~[flink-connector-files-1.19.1.jar:1.19.1] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) ~[flink-connector-files-1.19.1.jar:1.19.1] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.19.1.jar:1.19.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist-1.19.1.jar:1.19.1] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_432] Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-files-1.19.1.jar:1.19.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-files-1.19.1.jar:1.19.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_432] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_432] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_432] ... 1 more Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test_kafka_offset-0] at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:712) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2461) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1758) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.lambda$removeEmptySplits$4(KafkaPartitionSplitReader.java:375) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.retryOnWakeup(KafkaPartitionSplitReader.java:481) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:374) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:224) ~[flink-sql-connector-kafka-3.2.0-1.19.jar:3.2.0-1.19] at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51) ~[flink-connector-files-1.19.1.jar:1.19.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.19.1.jar:1.19.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-files-1.19.1.jar:1.19.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_432] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_432] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_432] ... 1 more But we expect the following: Situation 1 If the user does not set --kafka_conf scan.startup.mode, then earliest-offset should be used by default. Situation 2 If the user does not set --kafka_conf properties.auto.offset.reset, then earliest(This is kafka default value) should be used by default. Best, Jeff Yang