I'm working on a mini POC to enable Kafka as custom streaming source for a Python pipeline executing on the (in-progress) portable Flink runner.
We eventually want to use the same native Flink connectors for sources and sinks that we also use in other Flink jobs. I got a simple example to work with the FlinkKafkaConsumer010 reading from Kafka and a Python lambda logging the value. The code is here: https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 I'm looking for feedback/opinions on the following items in particular: * Enabling custom translation on the Flink portable runner (custom translator could be loaded with ServiceLoader, additional translations could also be specified as job server configuration, pipeline option, ...) * For the Python side, is what's shown in the commit the recommended way to define a custom transform (it would eventually live in a reusable custom module that pipeline authors can import)? Also, the example does not have the configuration part covered yet.. * Cross-language coders: In this example the Kafka source only considers the message value and uses the byte coder that both sides understand. If I wanted to pass on the key and possibly other metadata to the Python transform (similar to KafkaRecord from Java KafkaIO), then a specific coder is needed. Such coder could be written using protobuf, Avro etc, but it would also need to be registered. Thanks, Thomas