We are still working on adding documentation on authoring cross-language
transforms. Hopefully it will happen in the next quarter. For now, you'll
have to follow existing examples and guidelines Brian provided above and
follow existing examples. You might run into
https://issues.apache.org/jira/browse/BEAM-5440 for Flink though given that
MQTT seems to rely on checkpoint finalization to ack messages [1]. Probably
Luke or +Boyuan Zhang <[email protected]> can confirm.

Regarding URNs and Payloads, currently all cross-language transforms are
composite transforms that get fully expanded during pipeline construction.
There was no need to define a well known payload for such transforms. URNs
are usually defined in the Java implementation (for example, [1] for Kafka)
and currently there's an implicit agreement between the Java implementation
and the Python wrapper (both regarding the URN and the constructor
parameters encoded in the expansion request). We might want to better
define this when we have x-lang wrappers for multiple SDKs and/or multiple
versions of the same x-lang transform.

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java#L343
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L618



On Tue, Sep 29, 2020 at 2:24 AM Carolyn Langen <[email protected]> wrote:

> Hi Brian,
>
> Thanks for your quick reply! It sounds like a lot of work, but your
> instructions are nice and concise, which I appreciate. Before I try to
> tackle this, I want to make sure that the issues related to xlang Kafka via
> Flink won't get in the way of implementing and using this. For example,
> check out this thread
> <http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E>
> and this issue <https://issues.apache.org/jira/browse/BEAM-5440>. Ideally
> I would be able to get a Kafka Flink example to work before starting
> development.
>
> Please give me your opinion on this issue.
>
> Best regards,
> Carolyn
>
>
> On Mon, Sep 28, 2020 at 7:12 PM Brian Hulette <[email protected]> wrote:
>
>> Hi Carolyn, welcome to the dev list :)
>>
>> I'm assuming you're interested in making Java's MqttIO [1] available in
>> Python as a cross-language transform? Unfortunately I don't think there's a
>> concise guide for this yet. It definitely makes sense to follow KafkaIO as
>> an example, but I know there's a lot of code to dig through.. I can give
>> you a few pointers to the relevant parts.
>>
>> On the Java side, you'll need to provide implementations of
>> ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can
>> create an MqttIO Read and/or Write transforms. The Registrar is what
>> determines the URN, while the Builder determines the configuration
>> parameters via it's Configuration class [4] (we inspect the getters/setters
>> on the class for this).
>> You'll also want to make sure that there's a gradle target for building
>> an expansion service jar that includes your new MqttIO
>> ExternalTransformRegistrar. The easiest way to do this would be to add it
>> to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency
>> there (the same thing we do for Kafka [5]). This is fine to do for testing
>> of course, but ultimately we should probably have a separate expansion
>> service jar for it, like the one for KinesisIO [6].
>>
>> On the Python side, you'll need to provide a stub that extends
>> ExternalTransform. The critical pieces are that you've referenced the
>> correct expansion service jar [7], the same URN as in Java [8], and use a
>> compatible configuration object [9].
>>
>> I hope this helps! Please let us know if you need any more pointers
>> Brian
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html
>> [2]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616
>> [3]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512
>> [4]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629
>> [5]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35
>> [6]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle
>> [7]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107
>> [8]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123
>> [9]
>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93
>>
>> On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <[email protected]>
>> wrote:
>>
>>> Greetings,
>>>
>>> I need Python MQTT IO support for a project I'm working on. I've looked
>>> at the Kafka IO module, and planned to follow it, but I'm having trouble
>>> finding the URN to use, and how to configure the input parameters. Is there
>>> any documentation about this? If not, I'd appreciate an explanation of the
>>> steps I need to take to implement this.
>>>
>>> Best regards,
>>> Carolyn
>>>
>>> PS- apologies if there are duplicate posts. I haven't posted to this
>>> user list before and wasn't sure if my message wasn't appearing on the list
>>> because I wasn't subscribed yet.
>>>
>>

Reply via email to