jigar-bhati opened a new pull request, #267: URL: https://github.com/apache/flink-connector-kafka/pull/267
## Summary ### What This PR lets Kafka sources configure the offset reset strategy independently from the startup offset initializer. It adds an `OffsetsInitializer.withOffsetResetStrategy(...)` wrapper for the DataStream Kafka source and applies explicit `properties.auto.offset.reset` values to the chosen SQL Kafka startup initializer for all startup modes. ### Why `OffsetsInitializer` currently represents two separate concerns: 1. How the initial startup offsets are resolved. 2. Which Kafka `auto.offset.reset` strategy is used if an initialized concrete offset is unavailable. These are independent choices. For example, users may want a fresh source to start from earliest or latest, but still fail with `OffsetResetStrategy.NONE` after a concrete offset has been chosen and later becomes unavailable. One concrete failure mode is broker recovery after log corruption or truncation. A source may have checkpointed or otherwise resolved a concrete numeric offset while Kafka was healthy. After broker recovery, Kafka can report that offset as out of range. With the current coupling, a source that originally started from `earliest-offset` or `latest-offset` also inherits `earliest` or `latest` as the fallback for that later out-of-range concrete offset, which can silently reset consumption instead of surfacing the failure for operator review. The SQL Kafka connector has the same coupling today: `properties.auto.offset.reset` is applied for `group-offsets` startup, but explicit values are overridden by the startup initializer for `earliest-offset`, `latest-offset`, `specific-offsets`, and timestamp startup modes. ## Jira https://issues.apache.org/jira/browse/FLINK-39888 ## Testing ```bash ./mvnw -pl flink-connector-kafka -DskipITs -Dfast test \ -Dtest=KafkaSourceBuilderTest,OffsetsInitializerWithOffsetResetStrategyTest,DynamicKafkaSourceBuilderTest,KafkaDynamicTableFactoryTest,DynamicKafkaTableFactoryTest ``` Result: `Tests run: 66, Failures: 0, Errors: 0, Skipped: 0` ```bash ./mvnw -pl flink-connector-kafka spotless:check ``` Result: `BUILD SUCCESS` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
