tzulitai opened a new pull request #5: [FLINK-15769] [kafka-io] Add configuring startup position for Kafka ingress URL: https://github.com/apache/flink-statefun/pull/5 With this PR, the following options are now supported: * from committed consumer group offsets in Kafka * from earliest offset * from latest offset * from specific offsets * from offsets with ingestion timestamp >= specified date The changes simply expose configuration. Actual functionality is already implemented by the `FlinkKafkaConsumer`. The exposed configuration API is the following: ``` KafkaIngressBuilder<T> builder = KafkaIngressBuilder.forIdentifier(...) .withStartupPosition(KafkaIngressStartupPosition.fromGroupOffsets()) .withStartupPosition(KafkaIngressStartupPosition.fromEarliest()) .withStartupPosition(KafkaIngressStartupPosition.fromLatest()) .withStartupPosition(KafkaIngressStartupPosition.fromSpecificOffsets(Map<KafkaTopicPartition, Long>)) .withStartupPosition(KafkaIngressStartupPosition.fromDate(java.util.Date)) ``` --- Apart from the main changes, there are 3 additional preliminary changes: 7e8e522 Add named method for setting consumer group id `withConsumerGroupId(String)` to builder 8d09c39 Add named method for setting auto offset reset position 0982eec Move Kafka properties resolution from `KafkaSourceProvider` to builder The additional named methods makes sense, since they have an interplay with the startup position configs. For example, for `KafkaIngressStartupPosition.fromGroupOffsets()`, a consumer group id must be set. This is not provided as part of the `fromGroupOffsets` signature because consumer group ids have functionality beyond startup position (e.g., we may later introduce offset committing back to Kafka which also relies on a consumer group id being set).
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
