[
https://issues.apache.org/jira/browse/FLINK-15769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai closed FLINK-15769.
---------------------------------------
Fix Version/s: statefun-1.1
Resolution: Fixed
Merged to master via 9e0e3803d34daf8ac933310da8b62766adffe09d.
> Allow configuring offset startup positions for Stateful Functions Kafka
> Ingress
> -------------------------------------------------------------------------------
>
> Key: FLINK-15769
> URL: https://issues.apache.org/jira/browse/FLINK-15769
> Project: Flink
> Issue Type: New Feature
> Components: Stateful Functions
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Major
> Labels: pull-request-available
> Fix For: statefun-1.1
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> It is quite typical that a user is capable of setting where to start
> consuming a Kafka topic.
> Since the Stateful Functions Kafka ingress sits on top of Flink's Kafka
> consumer, there is already various options to start with:
> * {{GROUP_OFFSETS}} (default): start with whatever offsets were committed to
> Kafka for given {{group.id}}
> * {{LATEST}}: start from latest record in topic
> * {{EARLIEST}}: start from earliest record in topic
> * {{SPECIFIC_OFFSETS}}: provide a map of topic partition -> offset. This is
> particularly important for bootstrapped state scenarios, where the user would
> want to start from a specific position consistent with the state bootstrapped
> in their functions.
> * {{TIMESTAMP}}: start from offsets written starting from the given timestamp.
> The proposed API looks like so:
> {code}
> KafkaIngressBuilder<T> builder = KafkaIngressBuilder.forIdentifier(...)
> .withTopic(...)
> .withDeserializer(...)
>
> .withDefaultStartPosition(KafkaIngressStartPosition.fromEarliest()/fromLatest())
>
> .withSpecificStartOffsets(KafkaIngressStartOffsets.fromMap(Map)/fromTimestamp(Long))
> {code}
> The {{withDefaultStartPosition}} method is straightforward.
> The reason to separate this from another {{withSpecificStartOffsets}} method
> is that there would be cases where some partition does not contain the
> offsets specified by {{withSpecificStartOffsets}}.
> In this case, the ingress would need to fallback to some default
> configuration; this would be the {{withDefaultStartPosition}} configuration.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)