Send https://github.com/apache/beam/pull/10054 to update the roadmap.

Thanks,
Cham

On Mon, Nov 4, 2019 at 10:24 AM Chamikara Jayalath <chamik...@google.com>
wrote:

> Makes sense.
>
> I can look into expanding on what we have at following location and adding
> links to some of the existing work as a first step.
> https://beam.apache.org/roadmap/connectors-multi-sdk/
>
> Created https://issues.apache.org/jira/browse/BEAM-8553
>
> We also need more detailed documentation for cross-language transforms but
> that can be separate (and hopefully with help from tech writers who have
> been helping with Beam documentation in general).
>
> Thanks,
> Cham
>
>
> On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise <t...@apache.org> wrote:
>
>> This thread was very helpful to find more detail in
>> https://jira.apache.org/jira/browse/BEAM-7870
>>
>> It would be great to have cross-language current state mentioned as top
>> level entry on https://beam.apache.org/roadmap/
>>
>>
>> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Thanks for the nice write up Chad.
>>>
>>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Thanks for bringing this up again. My thoughts on the open questions
>>>> below.
>>>>
>>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com>
>>>> wrote:
>>>> > That commit solves 2 problems:
>>>> >
>>>> > Adds the pubsub Java deps so that they’re available in our portable
>>>> pipeline
>>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>>> available as a standard coder. This is required because both PubsubIO.Read
>>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>>> to hack it to make PubsubMessage appear as a standard coder.
>>>> >
>>>> > More details:
>>>> >
>>>> > There’s a similar magic commit required for Kafka external transforms
>>>> > The Jira issue for this problem is here:
>>>> https://jira.apache.org/jira/browse/BEAM-7870
>>>> > For problem #2 above there seems to be some consensus forming around
>>>> using Avro or schema/row coders to send compound types in a portable way.
>>>> Here’s the PR for making row coders portable
>>>> > https://github.com/apache/beam/pull/9188
>>>>
>>>> +1. Note that this doesn't mean that the IO itself must produce rows;
>>>> part of the Schema work in Java is to make it easy to automatically
>>>> convert from various Java classes to schemas transparently, so this
>>>> same logic that would allow one to apply an SQL filter directly to a
>>>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>>>> work, we need not uglify the Java API; we can have an
>>>> option/alternative transform that appends the convert-to-Row DoFn for
>>>> easier use by external (though the goal of the former work is to make
>>>> this step unnecissary).
>>>>
>>>
>>> Updating all IO connectors / transforms to have a version that
>>> produces/consumes a PCollection<Row> is infeasible so I agree that we need
>>> an automatic conversion to/from PCollection<Row> possibly by injecting
>>> PTransfroms during ExternalTransform expansion.
>>>
>>>>
>>>> > I don’t really have any ideas for problem #1
>>>>
>>>> The crux of the issue here is that the jobs API was not designed with
>>>> cross-language in mind, and so the artifact API ties artifacts to jobs
>>>> rather than to environments. To solve this we need to augment the
>>>> notion of environment to allow the specification of additional
>>>> dependencies (e.g. jar files in this specific case, or better as
>>>> maven/pypi/... dependencies (with version ranges) such that
>>>> environment merging and dependency resolution can be sanely done), and
>>>> a way for the expansion service to provide such dependencies.
>>>>
>>>> Max wrote up a summary of the prior discussions at
>>>>
>>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>>>
>>>> In the short term, one can build a custom docker image that has all
>>>> the requisite dependencies installed.
>>>>
>>>> This touches on a related but separable issue that one may want to run
>>>> some of these transforms "natively" in the same process as the runner
>>>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>>>> (Similarly with subprocess.) Exactly how that works with environment
>>>> specifications is also a bit TBD, but my proposal has been that these
>>>> are best viewed as runner-specific substitutions of standard
>>>> environments.
>>>>
>>>
>>> We need a permanent solution for this but for now we have a temporary
>>> solution where additional jar files can be specified through an experiment
>>> when running a Python pipeline:
>>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> > So the portability expansion system works, and now it’s time to sand
>>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>>> resolve some of these remaining issues.
>>>>
>>>> +1
>>>>
>>>>
>>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi all,
>>>> > There was some interest in this topic at the Beam Summit this week
>>>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>>>> the current state of things.
>>>> > First, let me explain the idea behind an external transforms for the
>>>> uninitiated.
>>>> >
>>>> > Problem:
>>>> >
>>>> > there’s a transform that you want to use, but it’s not available in
>>>> your desired language. IO connectors are a good example: there are many
>>>> available in the Java SDK, but not so much in Python or Go.
>>>> >
>>>> > Solution:
>>>> >
>>>> > Create a stub transform in your desired language (e.g. Python) whose
>>>> primary role is to serialize the parameters passed to that transform
>>>> > When you run your portable pipeline, just prior to it being sent to
>>>> the Job Service for execution, your stub transform’s payload is first sent
>>>> to the “Expansion Service” that’s running in the native language (Java),
>>>> where the payload is used to construct an instance of the native transform,
>>>> which is then expanded and converted to a protobuf and sent back to the
>>>> calling process (Python).
>>>> > The protobuf representation of the expanded transform gets integrated
>>>> back into the pipeline that you’re submitting
>>>> > Steps 2-3 are repeated for each external transform in your pipeline
>>>> > Then the whole pipeline gets sent to the Job Service to be invoked on
>>>> Flink/Spark/etc
>>>> >
>>>> > ________________________________
>>>> >
>>>> > Now on to my journey to get PubsubIO working in python on Flink.
>>>> >
>>>> > The first issue I encountered was that there was a lot of boilerplate
>>>> involved in serializing the stub python transform’s parameters so they can
>>>> be sent to the expansion service.
>>>> >
>>>> > I created a PR to make this simpler, which has just been merged to
>>>> master: https://github.com/apache/beam/pull/9098
>>>> >
>>>> > With this feature in place, if you’re using python 3.7 you can use a
>>>> dataclass and the typing module to create your transform and describe your
>>>> schema in one go. For example:
>>>> >
>>>> >     @dataclasses.dataclass
>>>> >     class MyAwesomeTransform(beam.ExternalTransform):
>>>> >       URN = 'beam:external:fakeurn:v1'
>>>> >
>>>> >       integer_example: int
>>>> >       string_example: str
>>>> >       list_of_strings: List[str]
>>>> >       optional_kv: Optional[Tuple[str, float]] = None
>>>> >       optional_integer: Optional[int] = None
>>>> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
>>>> >
>>>> > For earlier versions of python, you can use typing.NamedTuple to
>>>> declare your schema.
>>>> >
>>>> >     MyAwesomeSchema = typing.NamedTuple(
>>>> >         'MyAwesomeSchema',
>>>> >         [
>>>> >             ('integer_example', int),
>>>> >             ('string_example', unicode),
>>>> >             ('list_of_strings', List[unicode]),
>>>> >             ('optional_kv', Optional[Tuple[unicode, float]]),
>>>> >             ('optional_integer', Optional[int]),
>>>> >         ]
>>>> >     )
>>>> >
>>>> > There’s also an option to generate the schema implicitly based on the
>>>> value(s) you wish to serialize.
>>>> >
>>>> > There was a slight tangent in implementing this feature in that
>>>> requesting a coder for typing.List resulted in pickle coder instead of
>>>> IterableCoder. That’s bad because only standard/portable coders can be used
>>>> for expansion in Java (for obvious reasons), so as a convenience that was
>>>> solved here: https://github.com/apache/beam/pull/9344
>>>> >
>>>> > The next issue that I encountered was that python did not track the
>>>> boundedness of PCollections, which made it impossible to use the expansion
>>>> service to create unbounded writes. That’s been solved and merged here:
>>>> https://github.com/apache/beam/pull/9426
>>>> >
>>>> > So that brings us to the actual PR for adding external transform
>>>> support for PubsubIO: https://github.com/apache/beam/pull/9268
>>>> >
>>>> > The PR works, but with one big caveat: in order to use it you must
>>>> build your Java containers with this special commit:
>>>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>>>> >
>>>> > That commit solves 2 problems:
>>>> >
>>>> > Adds the pubsub Java deps so that they’re available in our portable
>>>> pipeline
>>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>>> available as a standard coder. This is required because both PubsubIO.Read
>>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>>> to hack it to make PubsubMessage appear as a standard coder.
>>>> >
>>>> > More details:
>>>> >
>>>> > There’s a similar magic commit required for Kafka external transforms
>>>> > The Jira issue for this problem is here:
>>>> https://jira.apache.org/jira/browse/BEAM-7870
>>>> > For problem #2 above there seems to be some consensus forming around
>>>> using Avro or schema/row coders to send compound types in a portable way.
>>>> Here’s the PR for making row coders portable
>>>> > https://github.com/apache/beam/pull/9188
>>>> > I don’t really have any ideas for problem #1
>>>> >
>>>> > So the portability expansion system works, and now it’s time to sand
>>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>>> resolve some of these remaining issues.
>>>> >
>>>> > -chad
>>>>
>>>

Reply via email to