jigar-bhati opened a new pull request, #267:
URL: https://github.com/apache/flink-connector-kafka/pull/267

   ## Summary
   
   ### What
   
   This PR lets Kafka sources configure the offset reset strategy independently 
from the startup offset initializer.
   
   It adds an `OffsetsInitializer.withOffsetResetStrategy(...)` wrapper for the 
DataStream Kafka source and applies explicit `properties.auto.offset.reset` 
values to the chosen SQL Kafka startup initializer for all startup modes.
   
   ### Why
   
   `OffsetsInitializer` currently represents two separate concerns:
   
   1. How the initial startup offsets are resolved.
   2. Which Kafka `auto.offset.reset` strategy is used if an initialized 
concrete offset is unavailable.
   
   These are independent choices. For example, users may want a fresh source to 
start from earliest or latest, but still fail with `OffsetResetStrategy.NONE` 
after a concrete offset has been chosen and later becomes unavailable.
   
   One concrete failure mode is broker recovery after log corruption or 
truncation. A source may have checkpointed or otherwise resolved a concrete 
numeric offset while Kafka was healthy. After broker recovery, Kafka can report 
that offset as out of range. With the current coupling, a source that 
originally started from `earliest-offset` or `latest-offset` also inherits 
`earliest` or `latest` as the fallback for that later out-of-range concrete 
offset, which can silently reset consumption instead of surfacing the failure 
for operator review.
   
   The SQL Kafka connector has the same coupling today: 
`properties.auto.offset.reset` is applied for `group-offsets` startup, but 
explicit values are overridden by the startup initializer for 
`earliest-offset`, `latest-offset`, `specific-offsets`, and timestamp startup 
modes.
   
   ## Jira
   
   https://issues.apache.org/jira/browse/FLINK-39888
   
   ## Testing
   
   ```bash
   ./mvnw -pl flink-connector-kafka -DskipITs -Dfast test \
     
-Dtest=KafkaSourceBuilderTest,OffsetsInitializerWithOffsetResetStrategyTest,DynamicKafkaSourceBuilderTest,KafkaDynamicTableFactoryTest,DynamicKafkaTableFactoryTest
   ```
   
   Result: `Tests run: 66, Failures: 0, Errors: 0, Skipped: 0`
   
   ```bash
   ./mvnw -pl flink-connector-kafka spotless:check
   ```
   
   Result: `BUILD SUCCESS`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to