[
https://issues.apache.org/jira/browse/FLINK-39888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18087040#comment-18087040
]
Jigar Bhati commented on FLINK-39888:
-------------------------------------
Could a Flink committer assign this issue to me? I have a patch ready.
> [Kafka] Allow configuring offset reset strategy independently of startup
> offsets
> --------------------------------------------------------------------------------
>
> Key: FLINK-39888
> URL: https://issues.apache.org/jira/browse/FLINK-39888
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / Kafka
> Reporter: Jigar Bhati
> Priority: Major
>
> KafkaSource currently uses OffsetsInitializer for two separate concerns:
> 1. resolving the initial starting offsets
> 2. selecting Kafka's auto.offset.reset strategy when an initialized starting
> offset is unavailable
> These choices are independent, but the public API does not let users compose
> them for all startup modes.
> One concrete failure mode is broker recovery after a log corruption or
> truncation event. A source may have previously checkpointed or otherwise
> resolved a concrete numeric offset while Kafka was healthy. After recovery,
> Kafka can report that this offset is no longer available on the partition
> (which could be due to corruption or retention or anything similar). At that
> point, the Kafka consumer applies auto.offset.reset. With the current
> coupling, a source that originally started with earliest-offset or
> latest-offset also inherits earliest/latest as the fallback for that later
> out-of-range concrete offset. The job can therefore silently reset to
> earliest or latest instead of failing with an out-of-range error that lets an
> operator decide what to do.
> The desired behavior is to configure these independently. For example, users
> may want a fresh source to start from earliest or latest, but use
> OffsetResetStrategy.NONE once a concrete initialized offset has been chosen,
> so a later unavailable offset fails explicitly instead of silently resetting.
> The SQL Kafka connector has the same coupling today:
> properties.auto.offset.reset is only applied for group-offsets startup, while
> explicit values are overridden by the startup initializer for
> earliest-offset, latest-offset, specific-offsets, and timestamp startup modes.
> Proposed change:
> - Add OffsetsInitializer.withOffsetResetStrategy(OffsetsInitializer,
> OffsetResetStrategy).
> - The wrapper delegates partition offset initialization and validation, but
> overrides getAutoOffsetResetStrategy().
> - Apply explicit properties.auto.offset.reset to the chosen Kafka SQL startup
> initializer for all startup modes.
> - Preserve existing defaults when properties.auto.offset.reset is not
> explicitly configured.
> - Update docs to clarify that scan.startup.mode chooses the initial position,
> while auto.offset.reset controls fallback when an initialized starting offset
> is unavailable.
> Related:
> FLINK-39382 documents current auto.offset.reset support for group-offsets.
> Testing:
> - KafkaSourceBuilder reset strategy tests
> - DynamicKafkaSourceBuilder reset strategy tests
> - initializer wrapper unit test
> - static Kafka SQL source tests
> - dynamic Kafka SQL source tests
--
This message was sent by Atlassian Jira
(v8.20.10#820010)