Hi Carolyn, welcome to the dev list :) I'm assuming you're interested in making Java's MqttIO [1] available in Python as a cross-language transform? Unfortunately I don't think there's a concise guide for this yet. It definitely makes sense to follow KafkaIO as an example, but I know there's a lot of code to dig through.. I can give you a few pointers to the relevant parts.
On the Java side, you'll need to provide implementations of ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can create an MqttIO Read and/or Write transforms. The Registrar is what determines the URN, while the Builder determines the configuration parameters via it's Configuration class [4] (we inspect the getters/setters on the class for this). You'll also want to make sure that there's a gradle target for building an expansion service jar that includes your new MqttIO ExternalTransformRegistrar. The easiest way to do this would be to add it to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency there (the same thing we do for Kafka [5]). This is fine to do for testing of course, but ultimately we should probably have a separate expansion service jar for it, like the one for KinesisIO [6]. On the Python side, you'll need to provide a stub that extends ExternalTransform. The critical pieces are that you've referenced the correct expansion service jar [7], the same URN as in Java [8], and use a compatible configuration object [9]. I hope this helps! Please let us know if you need any more pointers Brian [1] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html [2] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616 [3] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512 [4] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629 [5] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35 [6] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle [7] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107 [8] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123 [9] https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93 On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <[email protected]> wrote: > Greetings, > > I need Python MQTT IO support for a project I'm working on. I've looked at > the Kafka IO module, and planned to follow it, but I'm having trouble > finding the URN to use, and how to configure the input parameters. Is there > any documentation about this? If not, I'd appreciate an explanation of the > steps I need to take to implement this. > > Best regards, > Carolyn > > PS- apologies if there are duplicate posts. I haven't posted to this user > list before and wasn't sure if my message wasn't appearing on the list > because I wasn't subscribed yet. >
