[
https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860130#comment-17860130
]
Muhammet Orazov commented on FLINK-35565:
-----------------------------------------
The solution of https://issues.apache.org/jira/browse/FLINK-34470 could work
for this issue also.
> Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
> --------------------------------------------------------------------------
>
> Key: FLINK-35565
> URL: https://issues.apache.org/jira/browse/FLINK-35565
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: kafka-3.1.0
> Environment: This is reproduced on a *Flink 1.18.1* with the latest
> Kafka connector 3.1.0-1.18 on a session cluster.
> Reporter: Naci Simsek
> Priority: Major
> Attachments: image-2024-06-11-11-19-09-889.png,
> taskmanager_localhost_54489-ac092a_log.txt
>
>
> h2. Summary
> Flink batch job gets into an infinite fetch loop and could not gracefully
> finish if the connected Kafka topic is empty and starting offset value in
> Flink job is lower than the current start/end offset of the related topic.
> See below for details:
> h2. How to reproduce
> Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events
> from Kafka topic.
> Related Kafka topic is empty, there are no events, and the offset value is as
> below: *15*
> !image-2024-06-11-11-19-09-889.png|width=895,height=256!
>
> Flink job uses a *specific starting offset* value, which is +*less*+ than the
> current offset of the topic/partition.
> See below, it set as “4”
>
> {code:java}
> package naci.grpId;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.connector.kafka.source.KafkaSource;
> import
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.kafka.common.TopicPartition;
> import java.util.HashMap;
> import java.util.Map;
> public class KafkaSource_Print {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> // Define the specific offsets for the partitions
> Map<TopicPartition, Long> specificOffsets = new HashMap<>();
> specificOffsets.put(new TopicPartition("topic_test", 0), 4L); //
> Start from offset 4 for partition 0
> KafkaSource<String> kafkaSource = KafkaSource
> .<String>builder()
> .setBootstrapServers("localhost:9093") // Make sure the port
> is correct
> .setTopics("topic_test")
> .setValueOnlyDeserializer(new SimpleStringSchema())
>
> .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
> .setBounded(OffsetsInitializer.latest())
> .build();
> DataStream<String> stream = env.fromSource(
> kafkaSource,
> WatermarkStrategy.noWatermarks(),
> "Kafka Source"
> );
> stream.print();
> env.execute("Flink KafkaSource test job");
> }
> }{code}
>
>
> Here are the initial logs printed related to the offset, as soon as the job
> gets submitted:
>
> {code:java}
> 2024-05-30 12:15:50,010 INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
> split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4,
> StoppingOffset: 15]]
> 2024-05-30 12:15:50,069 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4,
> StoppingOffset: 15]]]
> 2024-05-30 12:15:50,074 TRACE
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] -
> Seeking starting offsets to specified offsets: {topic_test-0=4}
> 2024-05-30 12:15:50,074 INFO org.apache.kafka.clients.consumer.KafkaConsumer
> [] - [Consumer clientId=KafkaSource--2381765882724812354-0,
> groupId=null] Seeking to offset 4 for partition topic_test-0
> 2024-05-30 12:15:50,075 DEBUG
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] -
> SplitsChange handling result: [topic_test-0, start:4, stop: 15]
> 2024-05-30 12:15:50,075 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished running task AddSplitsTask: [[[Partition: topic_test-0,
> StartingOffset: 4, StoppingOffset: 15]]]
> 2024-05-30 12:15:50,075 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run FetchTask{code}
>
> Since the starting offset {color:#ff0000}*4*{color} is *out of range* for the
> Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task
> manager logs:
>
> {code:java}
> 2024-05-30 12:15:50,193 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher [] - [Consumer
> clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position
> FetchPosition{offset=4, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack:
> null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
> 2024-05-30 12:15:50,195 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer
> clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset
> for partition topic_test-0 to position FetchPosition{offset=15,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack:
> null)], epoch=0}}.{code}
>
>
> Then, an {color:#ff0000}*infinite {{FetchTask}} loop*{color} starts:
>
> {code:java}
> 2024-05-30 12:16:00,079 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished running task FetchTask
> 2024-05-30 12:16:00,079 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run FetchTask
> 2024-05-30 12:16:00,079 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
> fetch is finished.
> 2024-05-30 12:16:00,080 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:06,288 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:08,755 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:10,082 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished running task FetchTask
> 2024-05-30 12:16:10,082 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run FetchTask
> 2024-05-30 12:16:10,082 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
> next source data batch from queue
> 2024-05-30 12:16:10,082 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
> fetch is finished.
> 2024-05-30 12:16:10,082 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:16,290 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:17,393 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> file upload request for file LOG
> 2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient
> [] - PUT BLOB stream to /127.0.0.1:55663.
> 2024-05-30 12:16:18,757 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:20,084 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished running task FetchTask
> 2024-05-30 12:16:20,084 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
> next source data batch from queue
> 2024-05-30 12:16:20,084 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run FetchTask
> 2024-05-30 12:16:20,084 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
> fetch is finished.
> 2024-05-30 12:16:20,084 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:26,293 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:28,761 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:30,086 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished running task FetchTask
> 2024-05-30 12:16:30,086 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
> next source data batch from queue
> 2024-05-30 12:16:30,086 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run FetchTask
> 2024-05-30 12:16:30,086 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
> fetch is finished.
> 2024-05-30 12:16:30,086 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:36,296 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:38,762 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:40,087 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished running task FetchTask
> 2024-05-30 12:16:40,087 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting
> next source data batch from queue
> 2024-05-30 12:16:40,087 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run FetchTask
> 2024-05-30 12:16:40,088 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current
> fetch is finished.
> 2024-05-30 12:16:40,088 TRACE
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source
> reader status: NOTHING_AVAILABLE
> 2024-05-30 12:16:46,297 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from df54e7abdfa0095dc5c214b056153dea.
> 2024-05-30 12:16:48,765 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received
> heartbeat request from e1746de110bfdd23c7dba50f3b083621.
> 2024-05-30 12:16:50,089 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished running task FetchTask{code}
>
> The loop *ends* as soon as I *add* a *new event* on this Kafka topic, which
> will be placed in offset 15.
> {+}*Expected Result*{+}: Since this is a batch job, and since there is no
> event on the Kafka topic, right after offset reset, Flink connector should
> identify that there is no events to process, and gracefully finish the
> application.
> {+}*Actual Result*{+}: Flink connector infinitely tries to fetch an event
> from offset:15 which actually exists but no events on that offset,
> application keep fetching that same offset!
>
> This issue is +*NOT*+ happening if the above Flink application sets a
> *starting offset* +*15*+ or {+}*higher*{+}! If it is given as 15 or higher,
> no offset reset is performed, and the Flink application gracefully finishes!
> This is reproduced on a *Flink 1.18.1* with the latest Kafka connector
> 3.1.0-1.18 on a session cluster.
> Logs are attached.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)