[ 
https://issues.apache.org/jira/browse/FLINK-39888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui reassigned FLINK-39888:
-----------------------------------

    Assignee: Jigar Bhati

> [Kafka] Allow configuring offset reset strategy independently of startup 
> offsets
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-39888
>                 URL: https://issues.apache.org/jira/browse/FLINK-39888
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Kafka
>            Reporter: Jigar Bhati
>            Assignee: Jigar Bhati
>            Priority: Major
>
> KafkaSource currently uses OffsetsInitializer for two separate concerns:
> 1. resolving the initial starting offsets
> 2. selecting Kafka's auto.offset.reset strategy when an initialized starting 
> offset is unavailable
> These choices are independent, but the public API does not let users compose 
> them for all startup modes.
> One concrete failure mode is broker recovery after a log corruption or 
> truncation event. A source may have previously checkpointed or otherwise 
> resolved a concrete numeric offset while Kafka was healthy. After recovery, 
> Kafka can report that this offset is no longer available on the partition 
> (which could be due to corruption or retention or anything similar). At that 
> point, the Kafka consumer applies auto.offset.reset. With the current 
> coupling, a source that originally started with earliest-offset or 
> latest-offset also inherits earliest/latest as the fallback for that later 
> out-of-range concrete offset. The job can therefore silently reset to 
> earliest or latest instead of failing with an out-of-range error that lets an 
> operator decide what to do.
> The desired behavior is to configure these independently. For example, users 
> may want a fresh source to start from earliest or latest, but use 
> OffsetResetStrategy.NONE once a concrete initialized offset has been chosen, 
> so a later unavailable offset fails explicitly instead of silently resetting.
> The SQL Kafka connector has the same coupling today: 
> properties.auto.offset.reset is only applied for group-offsets startup, while 
> explicit values are overridden by the startup initializer for 
> earliest-offset, latest-offset, specific-offsets, and timestamp startup modes.
> Proposed change:
> - Add OffsetsInitializer.withOffsetResetStrategy(OffsetsInitializer, 
> OffsetResetStrategy).
> - The wrapper delegates partition offset initialization and validation, but 
> overrides getAutoOffsetResetStrategy().
> - Apply explicit properties.auto.offset.reset to the chosen Kafka SQL startup 
> initializer for all startup modes.
> - Preserve existing defaults when properties.auto.offset.reset is not 
> explicitly configured.
> - Update docs to clarify that scan.startup.mode chooses the initial position, 
> while auto.offset.reset controls fallback when an initialized starting offset 
> is unavailable.
> Related:
> FLINK-39382 documents current auto.offset.reset support for group-offsets.
> Testing:
> - KafkaSourceBuilder reset strategy tests
> - DynamicKafkaSourceBuilder reset strategy tests
> - initializer wrapper unit test
> - static Kafka SQL source tests
> - dynamic Kafka SQL source tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to