Hi Brian,

Thanks for your quick reply! It sounds like a lot of work, but your
instructions are nice and concise, which I appreciate. Before I try to
tackle this, I want to make sure that the issues related to xlang Kafka via
Flink won't get in the way of implementing and using this. For example,
check out this thread
<http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E>
and this issue <https://issues.apache.org/jira/browse/BEAM-5440>. Ideally I
would be able to get a Kafka Flink example to work before starting
development.

Please give me your opinion on this issue.

Best regards,
Carolyn


On Mon, Sep 28, 2020 at 7:12 PM Brian Hulette <[email protected]> wrote:

> Hi Carolyn, welcome to the dev list :)
>
> I'm assuming you're interested in making Java's MqttIO [1] available in
> Python as a cross-language transform? Unfortunately I don't think there's a
> concise guide for this yet. It definitely makes sense to follow KafkaIO as
> an example, but I know there's a lot of code to dig through.. I can give
> you a few pointers to the relevant parts.
>
> On the Java side, you'll need to provide implementations of
> ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can
> create an MqttIO Read and/or Write transforms. The Registrar is what
> determines the URN, while the Builder determines the configuration
> parameters via it's Configuration class [4] (we inspect the getters/setters
> on the class for this).
> You'll also want to make sure that there's a gradle target for building an
> expansion service jar that includes your new MqttIO
> ExternalTransformRegistrar. The easiest way to do this would be to add it
> to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency
> there (the same thing we do for Kafka [5]). This is fine to do for testing
> of course, but ultimately we should probably have a separate expansion
> service jar for it, like the one for KinesisIO [6].
>
> On the Python side, you'll need to provide a stub that extends
> ExternalTransform. The critical pieces are that you've referenced the
> correct expansion service jar [7], the same URN as in Java [8], and use a
> compatible configuration object [9].
>
> I hope this helps! Please let us know if you need any more pointers
> Brian
>
> [1]
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html
> [2]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616
> [3]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512
> [4]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629
> [5]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35
> [6]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle
> [7]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107
> [8]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123
> [9]
> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93
>
> On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <[email protected]>
> wrote:
>
>> Greetings,
>>
>> I need Python MQTT IO support for a project I'm working on. I've looked
>> at the Kafka IO module, and planned to follow it, but I'm having trouble
>> finding the URN to use, and how to configure the input parameters. Is there
>> any documentation about this? If not, I'd appreciate an explanation of the
>> steps I need to take to implement this.
>>
>> Best regards,
>> Carolyn
>>
>> PS- apologies if there are duplicate posts. I haven't posted to this user
>> list before and wasn't sure if my message wasn't appearing on the list
>> because I wasn't subscribed yet.
>>
>

Reply via email to