Igal Shilman created FLINK-16123:
------------------------------------
Summary: Add routable Kafka connector
Key: FLINK-16123
URL: https://issues.apache.org/jira/browse/FLINK-16123
Project: Flink
Issue Type: Task
Components: Stateful Functions
Reporter: Igal Shilman
In some cases it is beneficial to associate a stateful function instance with a
key in a Kafka topic.
In that case, a simplified Kafka ingress definition can be introduced.
Consider the following example:
Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings
representing user ids,
and the values are Protobuf messages of type (2)
com.user.foo.bar.greeter.SingupMessage.
We would like to have a stateful function of type(3)
{code:java}
FunctionType( com.user.foo.bar, SingupProcessor{code}
to be invoked for each incoming signup message.
The following spec definition:
{code:java}
- ingress:
meta:
type: org.apache.flink.statefun.sdk.kafka/routable-kafka-connector
id: com.user.foo.bar/greeter
spec:
properties:
- consumer.group: greeter
topics:
- singups: (1)
typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage"
target: (3) "com.user.foo.bar/SingupProcessor"
{code}
Defines a Kafka ingress that consumes <utf8 strings, bytes > from a singups
topic,
and produces an Routable Protobuf message with the following type and
properties:
{code}
message Routable {
Address target; (1)
Any payload;
}
{code}
Where:
(1) is Address(FunctionType(com.user.foo.bar, SingupProcessor), <a consumer
record's key>)
(2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the
value bytes
would come directly from the consumer record value bytes
This would require an additional AutoRoutable router,
that basically forwards the payload to the target address.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)