This thread was very helpful to find more detail in
https://jira.apache.org/jira/browse/BEAM-7870

It would be great to have cross-language current state mentioned as top
level entry on https://beam.apache.org/roadmap/


On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> Thanks for the nice write up Chad.
>
> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> Thanks for bringing this up again. My thoughts on the open questions
>> below.
>>
>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote:
>> > That commit solves 2 problems:
>> >
>> > Adds the pubsub Java deps so that they’re available in our portable
>> pipeline
>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>> available as a standard coder. This is required because both PubsubIO.Read
>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>> to hack it to make PubsubMessage appear as a standard coder.
>> >
>> > More details:
>> >
>> > There’s a similar magic commit required for Kafka external transforms
>> > The Jira issue for this problem is here:
>> https://jira.apache.org/jira/browse/BEAM-7870
>> > For problem #2 above there seems to be some consensus forming around
>> using Avro or schema/row coders to send compound types in a portable way.
>> Here’s the PR for making row coders portable
>> > https://github.com/apache/beam/pull/9188
>>
>> +1. Note that this doesn't mean that the IO itself must produce rows;
>> part of the Schema work in Java is to make it easy to automatically
>> convert from various Java classes to schemas transparently, so this
>> same logic that would allow one to apply an SQL filter directly to a
>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>> work, we need not uglify the Java API; we can have an
>> option/alternative transform that appends the convert-to-Row DoFn for
>> easier use by external (though the goal of the former work is to make
>> this step unnecissary).
>>
>
> Updating all IO connectors / transforms to have a version that
> produces/consumes a PCollection<Row> is infeasible so I agree that we need
> an automatic conversion to/from PCollection<Row> possibly by injecting
> PTransfroms during ExternalTransform expansion.
>
>>
>> > I don’t really have any ideas for problem #1
>>
>> The crux of the issue here is that the jobs API was not designed with
>> cross-language in mind, and so the artifact API ties artifacts to jobs
>> rather than to environments. To solve this we need to augment the
>> notion of environment to allow the specification of additional
>> dependencies (e.g. jar files in this specific case, or better as
>> maven/pypi/... dependencies (with version ranges) such that
>> environment merging and dependency resolution can be sanely done), and
>> a way for the expansion service to provide such dependencies.
>>
>> Max wrote up a summary of the prior discussions at
>>
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>
>> In the short term, one can build a custom docker image that has all
>> the requisite dependencies installed.
>>
>> This touches on a related but separable issue that one may want to run
>> some of these transforms "natively" in the same process as the runner
>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>> (Similarly with subprocess.) Exactly how that works with environment
>> specifications is also a bit TBD, but my proposal has been that these
>> are best viewed as runner-specific substitutions of standard
>> environments.
>>
>
> We need a permanent solution for this but for now we have a temporary
> solution where additional jar files can be specified through an experiment
> when running a Python pipeline:
> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>
> Thanks,
> Cham
>
>
>>
>> > So the portability expansion system works, and now it’s time to sand
>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>> resolve some of these remaining issues.
>>
>> +1
>>
>>
>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote:
>> >
>> > Hi all,
>> > There was some interest in this topic at the Beam Summit this week
>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>> the current state of things.
>> > First, let me explain the idea behind an external transforms for the
>> uninitiated.
>> >
>> > Problem:
>> >
>> > there’s a transform that you want to use, but it’s not available in
>> your desired language. IO connectors are a good example: there are many
>> available in the Java SDK, but not so much in Python or Go.
>> >
>> > Solution:
>> >
>> > Create a stub transform in your desired language (e.g. Python) whose
>> primary role is to serialize the parameters passed to that transform
>> > When you run your portable pipeline, just prior to it being sent to the
>> Job Service for execution, your stub transform’s payload is first sent to
>> the “Expansion Service” that’s running in the native language (Java), where
>> the payload is used to construct an instance of the native transform, which
>> is then expanded and converted to a protobuf and sent back to the calling
>> process (Python).
>> > The protobuf representation of the expanded transform gets integrated
>> back into the pipeline that you’re submitting
>> > Steps 2-3 are repeated for each external transform in your pipeline
>> > Then the whole pipeline gets sent to the Job Service to be invoked on
>> Flink/Spark/etc
>> >
>> > ________________________________
>> >
>> > Now on to my journey to get PubsubIO working in python on Flink.
>> >
>> > The first issue I encountered was that there was a lot of boilerplate
>> involved in serializing the stub python transform’s parameters so they can
>> be sent to the expansion service.
>> >
>> > I created a PR to make this simpler, which has just been merged to
>> master: https://github.com/apache/beam/pull/9098
>> >
>> > With this feature in place, if you’re using python 3.7 you can use a
>> dataclass and the typing module to create your transform and describe your
>> schema in one go. For example:
>> >
>> >     @dataclasses.dataclass
>> >     class MyAwesomeTransform(beam.ExternalTransform):
>> >       URN = 'beam:external:fakeurn:v1'
>> >
>> >       integer_example: int
>> >       string_example: str
>> >       list_of_strings: List[str]
>> >       optional_kv: Optional[Tuple[str, float]] = None
>> >       optional_integer: Optional[int] = None
>> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
>> >
>> > For earlier versions of python, you can use typing.NamedTuple to
>> declare your schema.
>> >
>> >     MyAwesomeSchema = typing.NamedTuple(
>> >         'MyAwesomeSchema',
>> >         [
>> >             ('integer_example', int),
>> >             ('string_example', unicode),
>> >             ('list_of_strings', List[unicode]),
>> >             ('optional_kv', Optional[Tuple[unicode, float]]),
>> >             ('optional_integer', Optional[int]),
>> >         ]
>> >     )
>> >
>> > There’s also an option to generate the schema implicitly based on the
>> value(s) you wish to serialize.
>> >
>> > There was a slight tangent in implementing this feature in that
>> requesting a coder for typing.List resulted in pickle coder instead of
>> IterableCoder. That’s bad because only standard/portable coders can be used
>> for expansion in Java (for obvious reasons), so as a convenience that was
>> solved here: https://github.com/apache/beam/pull/9344
>> >
>> > The next issue that I encountered was that python did not track the
>> boundedness of PCollections, which made it impossible to use the expansion
>> service to create unbounded writes. That’s been solved and merged here:
>> https://github.com/apache/beam/pull/9426
>> >
>> > So that brings us to the actual PR for adding external transform
>> support for PubsubIO: https://github.com/apache/beam/pull/9268
>> >
>> > The PR works, but with one big caveat: in order to use it you must
>> build your Java containers with this special commit:
>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>> >
>> > That commit solves 2 problems:
>> >
>> > Adds the pubsub Java deps so that they’re available in our portable
>> pipeline
>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>> available as a standard coder. This is required because both PubsubIO.Read
>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>> to hack it to make PubsubMessage appear as a standard coder.
>> >
>> > More details:
>> >
>> > There’s a similar magic commit required for Kafka external transforms
>> > The Jira issue for this problem is here:
>> https://jira.apache.org/jira/browse/BEAM-7870
>> > For problem #2 above there seems to be some consensus forming around
>> using Avro or schema/row coders to send compound types in a portable way.
>> Here’s the PR for making row coders portable
>> > https://github.com/apache/beam/pull/9188
>> > I don’t really have any ideas for problem #1
>> >
>> > So the portability expansion system works, and now it’s time to sand
>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>> resolve some of these remaining issues.
>> >
>> > -chad
>>
>

Reply via email to