I believe +Boyuan Zhang <[email protected]> is working on
https://issues.apache.org/jira/browse/BEAM-6868 , but I'd like to confirm.
Boyuan, is that accurate?
Best
-P.

On Tue, Sep 29, 2020 at 8:34 AM Carolyn Langen <[email protected]> wrote:

> Thanks for the explanation, Chamikara. If I will run into BEAM-6868
> <https://issues.apache.org/jira/browse/BEAM-6868> then perhaps I should
> wait on implementing this... I don't want to put a lot of effort in only to
> have to stop and wait for 6868 to be completed. Is it currently being
> worked on?
>
> On Tue, Sep 29, 2020 at 5:17 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Tue, Sep 29, 2020 at 8:13 AM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> We are still working on adding documentation on authoring cross-language
>>> transforms. Hopefully it will happen in the next quarter. For now, you'll
>>> have to follow existing examples and guidelines Brian provided above and
>>> follow existing examples. You might run into
>>> https://issues.apache.org/jira/browse/BEAM-5440 for Flink though given
>>> that MQTT seems to rely on checkpoint finalization to ack messages [1].
>>> Probably Luke or +Boyuan Zhang <[email protected]> can confirm.
>>>
>>
>> Sorry, I wanted to say https://issues.apache.org/jira/browse/BEAM-6868.
>> I don't think https://issues.apache.org/jira/browse/BEAM-5440 will be an
>> issue unless you hope to use Flink in LOOPBACK mode.
>>
>>
>>> Regarding URNs and Payloads, currently all cross-language transforms are
>>> composite transforms that get fully expanded during pipeline construction.
>>> There was no need to define a well known payload for such transforms. URNs
>>> are usually defined in the Java implementation (for example, [1] for Kafka)
>>> and currently there's an implicit agreement between the Java implementation
>>> and the Python wrapper (both regarding the URN and the constructor
>>> parameters encoded in the expansion request). We might want to better
>>> define this when we have x-lang wrappers for multiple SDKs and/or multiple
>>> versions of the same x-lang transform.
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java#L343
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L618
>>>
>>>
>>>
>>> On Tue, Sep 29, 2020 at 2:24 AM Carolyn Langen <[email protected]>
>>> wrote:
>>>
>>>> Hi Brian,
>>>>
>>>> Thanks for your quick reply! It sounds like a lot of work, but your
>>>> instructions are nice and concise, which I appreciate. Before I try to
>>>> tackle this, I want to make sure that the issues related to xlang Kafka via
>>>> Flink won't get in the way of implementing and using this. For example,
>>>> check out this thread
>>>> <http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E>
>>>> and this issue <https://issues.apache.org/jira/browse/BEAM-5440>.
>>>> Ideally I would be able to get a Kafka Flink example to work before
>>>> starting development.
>>>>
>>>> Please give me your opinion on this issue.
>>>>
>>>> Best regards,
>>>> Carolyn
>>>>
>>>>
>>>> On Mon, Sep 28, 2020 at 7:12 PM Brian Hulette <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Carolyn, welcome to the dev list :)
>>>>>
>>>>> I'm assuming you're interested in making Java's MqttIO [1] available
>>>>> in Python as a cross-language transform? Unfortunately I don't think
>>>>> there's a concise guide for this yet. It definitely makes sense to follow
>>>>> KafkaIO as an example, but I know there's a lot of code to dig through.. I
>>>>> can give you a few pointers to the relevant parts.
>>>>>
>>>>> On the Java side, you'll need to provide implementations of
>>>>> ExternalTransformRegistrar [2], and ExternalTransformBuilder [3] that can
>>>>> create an MqttIO Read and/or Write transforms. The Registrar is what
>>>>> determines the URN, while the Builder determines the configuration
>>>>> parameters via it's Configuration class [4] (we inspect the 
>>>>> getters/setters
>>>>> on the class for this).
>>>>> You'll also want to make sure that there's a gradle target for
>>>>> building an expansion service jar that includes your new MqttIO
>>>>> ExternalTransformRegistrar. The easiest way to do this would be to add it
>>>>> to :sdks:java:io:expansion-service:shadowJar by adding mqtt as a 
>>>>> dependency
>>>>> there (the same thing we do for Kafka [5]). This is fine to do for testing
>>>>> of course, but ultimately we should probably have a separate expansion
>>>>> service jar for it, like the one for KinesisIO [6].
>>>>>
>>>>> On the Python side, you'll need to provide a stub that extends
>>>>> ExternalTransform. The critical pieces are that you've referenced the
>>>>> correct expansion service jar [7], the same URN as in Java [8], and use a
>>>>> compatible configuration object [9].
>>>>>
>>>>> I hope this helps! Please let us know if you need any more pointers
>>>>> Brian
>>>>>
>>>>> [1]
>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/mqtt/MqttIO.html
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L616
>>>>> [3]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L512
>>>>> [4]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L629
>>>>> [5]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/expansion-service/build.gradle#L35
>>>>> [6]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/java/io/kinesis/expansion-service/build.gradle
>>>>> [7]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L107
>>>>> [8]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L123
>>>>> [9]
>>>>> https://github.com/apache/beam/blob/321ac13a28906788507eea2e2eee7b1c3229fc29/sdks/python/apache_beam/io/kafka.py#L93
>>>>>
>>>>> On Mon, Sep 28, 2020 at 6:25 AM Carolyn Langen <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Greetings,
>>>>>>
>>>>>> I need Python MQTT IO support for a project I'm working on. I've
>>>>>> looked at the Kafka IO module, and planned to follow it, but I'm having
>>>>>> trouble finding the URN to use, and how to configure the input 
>>>>>> parameters.
>>>>>> Is there any documentation about this? If not, I'd appreciate an
>>>>>> explanation of the steps I need to take to implement this.
>>>>>>
>>>>>> Best regards,
>>>>>> Carolyn
>>>>>>
>>>>>> PS- apologies if there are duplicate posts. I haven't posted to this
>>>>>> user list before and wasn't sure if my message wasn't appearing on the 
>>>>>> list
>>>>>> because I wasn't subscribed yet.
>>>>>>
>>>>>

Reply via email to