[
https://issues.apache.org/jira/browse/FLINK-16123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-16123:
-----------------------------------
Labels: pull-request-available (was: )
> Add routable Kafka connector
> ----------------------------
>
> Key: FLINK-16123
> URL: https://issues.apache.org/jira/browse/FLINK-16123
> Project: Flink
> Issue Type: Task
> Components: Stateful Functions
> Reporter: Igal Shilman
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Major
> Labels: pull-request-available
>
> In some cases it is beneficial to associate a stateful function instance with
> a key in a Kafka topic.
> In that case, a simplified Kafka ingress definition can be introduced.
> Consider the following example:
> Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings
> representing user ids,
> and the values are Protobuf messages of type (2)
> com.user.foo.bar.greeter.SingupMessage.
> We would like to have a stateful function of type(3)
> {code:java}
> FunctionType( com.user.foo.bar, SingupProcessor{code}
> to be invoked for each incoming signup message.
> The following spec definition:
> {code:java}
>
> - ingress:
> meta:
> type:
> org.apache.flink.statefun.sdk.kafka/routable-kafka-connector
> id: com.user.foo.bar/greeter
> spec:
> properties:
> - consumer.group: greeter
> topics:
> - singups: (1)
> typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage"
> target: (3) "com.user.foo.bar/SingupProcessor"
> {code}
> Defines a Kafka ingress that consumes <utf8 strings, bytes > from a singups
> topic,
> and produces an Routable Protobuf message with the following type and
> properties:
> {code}
> message Routable {
> Address target; (1)
> Any payload;
> }
> {code}
> Where:
> (1) is Address(FunctionType(com.user.foo.bar, SingupProcessor), <a consumer
> record's key>)
> (2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the
> value bytes
> would come directly from the consumer record value bytes
> This would require an additional AutoRoutable router,
> that basically forwards the payload to the target address.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)