Thanks for the explanation, Chamikara. If I will run into BEAM-6868 <https://issues.apache.org/jira/browse/BEAM-6868> then perhaps I should wait on implementing this... I don't want to put a lot of effort in only to have to stop and wait for 6868 to be completed. Is it currently being worked on?
On Tue, Sep 29, 2020 at 5:17 PM Chamikara Jayalath <chamik...@google.com> wrote: > > > On Tue, Sep 29, 2020 at 8:13 AM Chamikara Jayalath <chamik...@google.com> > wrote: > >> We are still working on adding documentation on authoring cross-language >> transforms. Hopefully it will happen in the next quarter. For now, you'll >> have to follow existing examples and guidelines Brian provided above and >> follow existing examples. You might run into >> https://issues.apache.org/jira/browse/BEAM-5440 for Flink though given >> that MQTT seems to rely on checkpoint finalization to ack messages [1]. >> Probably Luke or +Boyuan Zhang <boyu...@google.com> can confirm. >> > > Sorry, I wanted to say https://issues.apache.org/jira/browse/BEAM-6868. I > don't think https://issues.apache.org/jira/browse/BEAM-5440 will be an > issue unless you hope to use Flink in LOOPBACK mode. > > >> Regarding URNs and Payloads, currently all cross-language transforms are >> composite transforms that get fully expanded during pipeline construction. >> There was no need to define a well known payload for such transforms. URNs >> are usually defined in the Java implementation (for example, [1] for Kafka) >> and currently there's an implicit agreement between the Java implementation >> and the Python wrapper (both regarding the URN and the constructor >> parameters encoded in the expansion request). We might want to better >> define this when we have x-lang wrappers for multiple SDKs and/or multiple >> versions of the same x-lang transform. >> >> Thanks, >> Cham >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java#L343 >> [2] >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L618 >> >> >> >> On Tue, Sep 29, 2020 at 2:24 AM Carolyn Langen <caro...@almende.org> >> wrote: >> >>> Hi Brian, >>> >>> Thanks for your quick reply! It sounds like a lot of work, but your >>> instructions are nice and concise, which I appreciate. Before I try to >>> tackle this, I want to make sure that the issues related to xlang Kafka via >>> Flink won't get in the way of implementing and using this. For example, >>> check out this thread >>> <http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E> >>> and this issue <https://issues.apache.org/jira/browse/BEAM-5440>. >>> Ideally I would be able to get a Kafka Flink example to work before >>> starting development. >>> >>> Please give me your opinion on this issue. >>> >>> Best regards, >>> Carolyn >>> >>> >>> On Mon, Sep 28, 2020 at 7:12 PM Brian Hulette <bhule...@google.com> >>> wrote: >>> >>>> Hi Carolyn, welcome to the dev list :) >>>> >>>> I'm assuming you're interested in making Java's MqttIO [1] available in >>>> Python as a cross-language transform? Unfortunately I don't think there's a >>>> concise guide for this yet. It definitely makes sense to follow KafkaIO as >>>> an example, but I know there's a lot of code to dig through.. I can give >>>> you a few pointers to the relevant parts. >>>> >>>> On the Java side, you'll need to provide implementations of >>>> ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can >>>> create an MqttIO Read and/or Write transforms. The Registrar is what >>>> determines the URN, while the Builder determines the configuration >>>> parameters via it's Configuration class [4] (we inspect the getters/setters >>>> on the class for this). >>>> You'll also want to make sure that there's a gradle target for building >>>> an expansion service jar that includes your new MqttIO >>>> ExternalTransformRegistrar. The easiest way to do this would be to add it >>>> to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency >>>> there (the same thing we do for Kafka [5]). This is fine to do for testing >>>> of course, but ultimately we should probably have a separate expansion >>>> service jar for it, like the one for KinesisIO [6]. >>>> >>>> On the Python side, you'll need to provide a stub that extends >>>> ExternalTransform. The critical pieces are that you've referenced the >>>> correct expansion service jar [7], the same URN as in Java [8], and use a >>>> compatible configuration object [9]. >>>> >>>> I hope this helps! Please let us know if you need any more pointers >>>> Brian >>>> >>>> [1] >>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html >>>> [2] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616 >>>> [3] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512 >>>> [4] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629 >>>> [5] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35 >>>> [6] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle >>>> [7] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107 >>>> [8] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123 >>>> [9] >>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93 >>>> >>>> On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <caro...@almende.org> >>>> wrote: >>>> >>>>> Greetings, >>>>> >>>>> I need Python MQTT IO support for a project I'm working on. I've >>>>> looked at the Kafka IO module, and planned to follow it, but I'm having >>>>> trouble finding the URN to use, and how to configure the input parameters. >>>>> Is there any documentation about this? If not, I'd appreciate an >>>>> explanation of the steps I need to take to implement this. >>>>> >>>>> Best regards, >>>>> Carolyn >>>>> >>>>> PS- apologies if there are duplicate posts. I haven't posted to this >>>>> user list before and wasn't sure if my message wasn't appearing on the >>>>> list >>>>> because I wasn't subscribed yet. >>>>> >>>>