tzulitai opened a new pull request #46: [FLINK-16396] [kafka] Support generic YAML-ized Kafka egress URL: https://github.com/apache/flink-statefun/pull/46 This PR adds a YAML-ized Kafka egress, with type: `org.apache.flink.statefun.sdk.kafka/generic-kafka-egress`. This egress is called a "generic" Kafka egress, because serialization of the value bytes is done in user space and not the framework's responsibility. The egress simply writes to Kafka whatever value bytes were provided. --- ### Changelog description Descriptions for the major changes: - 752a84f Introduces a `JsonEgressSpec` class, and extends `JsonModule` to bind json egress specs. Note how the `consumedType` of the binded egresses are `Any`, and not `Message`. In general, egresses bind through YAML should always expect an `Any` as the messages are sent from remote polyglot functions, so I chose to use the more specific subclass here. - 13018a6 Adds the `GenericKafkaSinkProvider` which provides a `FlinkKafkaProducer` for a given `JsonEgressSpec`. This sink provider *always* expects the input `Any`s to be able to be unpacked as `KafkaProducerRecord`s, and simply transforms those into Kafka's own `ProducerRecord` before writing them to Kafka (see `GenericKafkaEgressSerializer`). - 136d80a Binds the new generic Kafka egress. --- ### Verifying I verified this manually. I also have a local branch that adapts the `RoutableKafkaE2E` to use this generic Kafka egress (and therefore have YAML-ized Kafka ingress / egress on both ends), and it also passes. I chose not to include the changes to that E2E in this PR, unless we think it makes sense to do so (can do as a follow-up PR).
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
