I believe +Boyuan Zhang <[email protected]> is working on https://issues.apache.org/jira/browse/BEAM-6868 , but I'd like to confirm. Boyuan, is that accurate? Best -P.
On Tue, Sep 29, 2020 at 8:34 AM Carolyn Langen <[email protected]> wrote: > Thanks for the explanation, Chamikara. If I will run into BEAM-6868 > <https://issues.apache.org/jira/browse/BEAM-6868> then perhaps I should > wait on implementing this... I don't want to put a lot of effort in only to > have to stop and wait for 6868 to be completed. Is it currently being > worked on? > > On Tue, Sep 29, 2020 at 5:17 PM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Tue, Sep 29, 2020 at 8:13 AM Chamikara Jayalath <[email protected]> >> wrote: >> >>> We are still working on adding documentation on authoring cross-language >>> transforms. Hopefully it will happen in the next quarter. For now, you'll >>> have to follow existing examples and guidelines Brian provided above and >>> follow existing examples. You might run into >>> https://issues.apache.org/jira/browse/BEAM-5440 for Flink though given >>> that MQTT seems to rely on checkpoint finalization to ack messages [1]. >>> Probably Luke or +Boyuan Zhang <[email protected]> can confirm. >>> >> >> Sorry, I wanted to say https://issues.apache.org/jira/browse/BEAM-6868. >> I don't think https://issues.apache.org/jira/browse/BEAM-5440 will be an >> issue unless you hope to use Flink in LOOPBACK mode. >> >> >>> Regarding URNs and Payloads, currently all cross-language transforms are >>> composite transforms that get fully expanded during pipeline construction. >>> There was no need to define a well known payload for such transforms. URNs >>> are usually defined in the Java implementation (for example, [1] for Kafka) >>> and currently there's an implicit agreement between the Java implementation >>> and the Python wrapper (both regarding the URN and the constructor >>> parameters encoded in the expansion request). We might want to better >>> define this when we have x-lang wrappers for multiple SDKs and/or multiple >>> versions of the same x-lang transform. >>> >>> Thanks, >>> Cham >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java#L343 >>> [2] >>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L618 >>> >>> >>> >>> On Tue, Sep 29, 2020 at 2:24 AM Carolyn Langen <[email protected]> >>> wrote: >>> >>>> Hi Brian, >>>> >>>> Thanks for your quick reply! It sounds like a lot of work, but your >>>> instructions are nice and concise, which I appreciate. Before I try to >>>> tackle this, I want to make sure that the issues related to xlang Kafka via >>>> Flink won't get in the way of implementing and using this. For example, >>>> check out this thread >>>> <http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E> >>>> and this issue <https://issues.apache.org/jira/browse/BEAM-5440>. >>>> Ideally I would be able to get a Kafka Flink example to work before >>>> starting development. >>>> >>>> Please give me your opinion on this issue. >>>> >>>> Best regards, >>>> Carolyn >>>> >>>> >>>> On Mon, Sep 28, 2020 at 7:12 PM Brian Hulette <[email protected]> >>>> wrote: >>>> >>>>> Hi Carolyn, welcome to the dev list :) >>>>> >>>>> I'm assuming you're interested in making Java's MqttIO [1] available >>>>> in Python as a cross-language transform? Unfortunately I don't think >>>>> there's a concise guide for this yet. It definitely makes sense to follow >>>>> KafkaIO as an example, but I know there's a lot of code to dig through.. I >>>>> can give you a few pointers to the relevant parts. >>>>> >>>>> On the Java side, you'll need to provide implementations of >>>>> ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can >>>>> create an MqttIO Read and/or Write transforms. The Registrar is what >>>>> determines the URN, while the Builder determines the configuration >>>>> parameters via it's Configuration class [4] (we inspect the >>>>> getters/setters >>>>> on the class for this). >>>>> You'll also want to make sure that there's a gradle target for >>>>> building an expansion service jar that includes your new MqttIO >>>>> ExternalTransformRegistrar. The easiest way to do this would be to add it >>>>> to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a >>>>> dependency >>>>> there (the same thing we do for Kafka [5]). This is fine to do for testing >>>>> of course, but ultimately we should probably have a separate expansion >>>>> service jar for it, like the one for KinesisIO [6]. >>>>> >>>>> On the Python side, you'll need to provide a stub that extends >>>>> ExternalTransform. The critical pieces are that you've referenced the >>>>> correct expansion service jar [7], the same URN as in Java [8], and use a >>>>> compatible configuration object [9]. >>>>> >>>>> I hope this helps! Please let us know if you need any more pointers >>>>> Brian >>>>> >>>>> [1] >>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html >>>>> [2] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616 >>>>> [3] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512 >>>>> [4] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629 >>>>> [5] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35 >>>>> [6] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle >>>>> [7] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107 >>>>> [8] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123 >>>>> [9] >>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93 >>>>> >>>>> On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <[email protected]> >>>>> wrote: >>>>> >>>>>> Greetings, >>>>>> >>>>>> I need Python MQTT IO support for a project I'm working on. I've >>>>>> looked at the Kafka IO module, and planned to follow it, but I'm having >>>>>> trouble finding the URN to use, and how to configure the input >>>>>> parameters. >>>>>> Is there any documentation about this? If not, I'd appreciate an >>>>>> explanation of the steps I need to take to implement this. >>>>>> >>>>>> Best regards, >>>>>> Carolyn >>>>>> >>>>>> PS- apologies if there are duplicate posts. I haven't posted to this >>>>>> user list before and wasn't sure if my message wasn't appearing on the >>>>>> list >>>>>> because I wasn't subscribed yet. >>>>>> >>>>>
