Hi Carolyn, welcome to the dev list :)

I'm assuming you're interested in making Java's MqttIO [1] available in
Python as a cross-language transform? Unfortunately I don't think there's a
concise guide for this yet. It definitely makes sense to follow KafkaIO as
an example, but I know there's a lot of code to dig through.. I can give
you a few pointers to the relevant parts.

On the Java side, you'll need to provide implementations of
ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can
create an MqttIO Read and/or Write transforms. The Registrar is what
determines the URN, while the Builder determines the configuration
parameters via it's Configuration class [4] (we inspect the getters/setters
on the class for this).
You'll also want to make sure that there's a gradle target for building an
expansion service jar that includes your new MqttIO
ExternalTransformRegistrar. The easiest way to do this would be to add it
to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency
there (the same thing we do for Kafka [5]). This is fine to do for testing
of course, but ultimately we should probably have a separate expansion
service jar for it, like the one for KinesisIO [6].

On the Python side, you'll need to provide a stub that extends
ExternalTransform. The critical pieces are that you've referenced the
correct expansion service jar [7], the same URN as in Java [8], and use a
compatible configuration object [9].

I hope this helps! Please let us know if you need any more pointers
Brian

[1]
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html
[2]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616
[3]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512
[4]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629
[5]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35
[6]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle
[7]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107
[8]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123
[9]
https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93

On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <[email protected]> wrote:

> Greetings,
>
> I need Python MQTT IO support for a project I'm working on. I've looked at
> the Kafka IO module, and planned to follow it, but I'm having trouble
> finding the URN to use, and how to configure the input parameters. Is there
> any documentation about this? If not, I'd appreciate an explanation of the
> steps I need to take to implement this.
>
> Best regards,
> Carolyn
>
> PS- apologies if there are duplicate posts. I haven't posted to this user
> list before and wasn't sure if my message wasn't appearing on the list
> because I wasn't subscribed yet.
>

Reply via email to