Thanks for the explanation, Chamikara. If I will run into BEAM-6868
<https://issues.apache.org/jira/browse/BEAM-6868> then perhaps I should
wait on implementing this... I don't want to put a lot of effort in only to
have to stop and wait for 6868 to be completed. Is it currently being
worked on?

On Tue, Sep 29, 2020 at 5:17 PM Chamikara Jayalath <chamik...@google.com>
wrote:

>
>
> On Tue, Sep 29, 2020 at 8:13 AM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> We are still working on adding documentation on authoring cross-language
>> transforms. Hopefully it will happen in the next quarter. For now, you'll
>> have to follow existing examples and guidelines Brian provided above and
>> follow existing examples. You might run into
>> https://issues.apache.org/jira/browse/BEAM-5440 for Flink though given
>> that MQTT seems to rely on checkpoint finalization to ack messages [1].
>> Probably Luke or +Boyuan Zhang <boyu...@google.com> can confirm.
>>
>
> Sorry, I wanted to say https://issues.apache.org/jira/browse/BEAM-6868. I
> don't think https://issues.apache.org/jira/browse/BEAM-5440 will be an
> issue unless you hope to use Flink in LOOPBACK mode.
>
>
>> Regarding URNs and Payloads, currently all cross-language transforms are
>> composite transforms that get fully expanded during pipeline construction.
>> There was no need to define a well known payload for such transforms. URNs
>> are usually defined in the Java implementation (for example, [1] for Kafka)
>> and currently there's an implicit agreement between the Java implementation
>> and the Python wrapper (both regarding the URN and the constructor
>> parameters encoded in the expansion request). We might want to better
>> define this when we have x-lang wrappers for multiple SDKs and/or multiple
>> versions of the same x-lang transform.
>>
>> Thanks,
>> Cham
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java#L343
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L618
>>
>>
>>
>> On Tue, Sep 29, 2020 at 2:24 AM Carolyn Langen <caro...@almende.org>
>> wrote:
>>
>>> Hi Brian,
>>>
>>> Thanks for your quick reply! It sounds like a lot of work, but your
>>> instructions are nice and concise, which I appreciate. Before I try to
>>> tackle this, I want to make sure that the issues related to xlang Kafka via
>>> Flink won't get in the way of implementing and using this. For example,
>>> check out this thread
>>> <http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E>
>>> and this issue <https://issues.apache.org/jira/browse/BEAM-5440>.
>>> Ideally I would be able to get a Kafka Flink example to work before
>>> starting development.
>>>
>>> Please give me your opinion on this issue.
>>>
>>> Best regards,
>>> Carolyn
>>>
>>>
>>> On Mon, Sep 28, 2020 at 7:12 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Hi Carolyn, welcome to the dev list :)
>>>>
>>>> I'm assuming you're interested in making Java's MqttIO [1] available in
>>>> Python as a cross-language transform? Unfortunately I don't think there's a
>>>> concise guide for this yet. It definitely makes sense to follow KafkaIO as
>>>> an example, but I know there's a lot of code to dig through.. I can give
>>>> you a few pointers to the relevant parts.
>>>>
>>>> On the Java side, you'll need to provide implementations of
>>>> ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can
>>>> create an MqttIO Read and/or Write transforms. The Registrar is what
>>>> determines the URN, while the Builder determines the configuration
>>>> parameters via it's Configuration class [4] (we inspect the getters/setters
>>>> on the class for this).
>>>> You'll also want to make sure that there's a gradle target for building
>>>> an expansion service jar that includes your new MqttIO
>>>> ExternalTransformRegistrar. The easiest way to do this would be to add it
>>>> to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a dependency
>>>> there (the same thing we do for Kafka [5]). This is fine to do for testing
>>>> of course, but ultimately we should probably have a separate expansion
>>>> service jar for it, like the one for KinesisIO [6].
>>>>
>>>> On the Python side, you'll need to provide a stub that extends
>>>> ExternalTransform. The critical pieces are that you've referenced the
>>>> correct expansion service jar [7], the same URN as in Java [8], and use a
>>>> compatible configuration object [9].
>>>>
>>>> I hope this helps! Please let us know if you need any more pointers
>>>> Brian
>>>>
>>>> [1]
>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html
>>>> [2]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616
>>>> [3]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512
>>>> [4]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629
>>>> [5]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35
>>>> [6]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle
>>>> [7]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107
>>>> [8]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123
>>>> [9]
>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93
>>>>
>>>> On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <caro...@almende.org>
>>>> wrote:
>>>>
>>>>> Greetings,
>>>>>
>>>>> I need Python MQTT IO support for a project I'm working on. I've
>>>>> looked at the Kafka IO module, and planned to follow it, but I'm having
>>>>> trouble finding the URN to use, and how to configure the input parameters.
>>>>> Is there any documentation about this? If not, I'd appreciate an
>>>>> explanation of the steps I need to take to implement this.
>>>>>
>>>>> Best regards,
>>>>> Carolyn
>>>>>
>>>>> PS- apologies if there are duplicate posts. I haven't posted to this
>>>>> user list before and wasn't sure if my message wasn't appearing on the 
>>>>> list
>>>>> because I wasn't subscribed yet.
>>>>>
>>>>

Reply via email to