Hi Brian, Thanks for your quick reply! It sounds like a lot of work, but your instructions are nice and concise, which I appreciate. Before I try to tackle this, I want to make sure that the issues related to xlang Kafka via Flink won't get in the way of implementing and using this. For example, check out this thread <http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E> and this issue <https://issues.apache.org/jira/browse/BEAM-5440>. Ideally I would be able to get a Kafka Flink example to work before starting development.
Please give me your opinion on this issue. Best regards, Carolyn On Mon, Sep 28, 2020 at 7:12 PM Brian Hulette <[email protected]> wrote: > Hi Carolyn, welcome to the dev list :) > > I'm assuming you're interested in making Java's MqttIO [1] available in > Python as a cross-language transform? Unfortunately I don't think there's a > concise guide for this yet. It definitely makes sense to follow KafkaIO as > an example, but I know there's a lot of code to dig through.. I can give > you a few pointers to the relevant parts. > > On the Java side, you'll need to provide implementations of > ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can > create an MqttIO Read and/or Write transforms. The Registrar is what > determines the URN, while the Builder determines the configuration > parameters via it's Configuration class [4] (we inspect the getters/setters > on the class for this). > You'll also want to make sure that there's a gradle target for building an > expansion service jar that includes your new MqttIO > ExternalTransformRegistrar. The easiest way to do this would be to add it > to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency > there (the same thing we do for Kafka [5]). This is fine to do for testing > of course, but ultimately we should probably have a separate expansion > service jar for it, like the one for KinesisIO [6]. > > On the Python side, you'll need to provide a stub that extends > ExternalTransform. The critical pieces are that you've referenced the > correct expansion service jar [7], the same URN as in Java [8], and use a > compatible configuration object [9]. > > I hope this helps! Please let us know if you need any more pointers > Brian > > [1] > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html > [2] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616 > [3] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512 > [4] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629 > [5] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35 > [6] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle > [7] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107 > [8] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123 > [9] > https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93 > > On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <[email protected]> > wrote: > >> Greetings, >> >> I need Python MQTT IO support for a project I'm working on. I've looked >> at the Kafka IO module, and planned to follow it, but I'm having trouble >> finding the URN to use, and how to configure the input parameters. Is there >> any documentation about this? If not, I'd appreciate an explanation of the >> steps I need to take to implement this. >> >> Best regards, >> Carolyn >> >> PS- apologies if there are duplicate posts. I haven't posted to this user >> list before and wasn't sure if my message wasn't appearing on the list >> because I wasn't subscribed yet. >> >
