Send https://github.com/apache/beam/pull/10054 to update the roadmap.
Thanks, Cham On Mon, Nov 4, 2019 at 10:24 AM Chamikara Jayalath <chamik...@google.com> wrote: > Makes sense. > > I can look into expanding on what we have at following location and adding > links to some of the existing work as a first step. > https://beam.apache.org/roadmap/connectors-multi-sdk/ > > Created https://issues.apache.org/jira/browse/BEAM-8553 > > We also need more detailed documentation for cross-language transforms but > that can be separate (and hopefully with help from tech writers who have > been helping with Beam documentation in general). > > Thanks, > Cham > > > On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise <t...@apache.org> wrote: > >> This thread was very helpful to find more detail in >> https://jira.apache.org/jira/browse/BEAM-7870 >> >> It would be great to have cross-language current state mentioned as top >> level entry on https://beam.apache.org/roadmap/ >> >> >> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Thanks for the nice write up Chad. >>> >>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Thanks for bringing this up again. My thoughts on the open questions >>>> below. >>>> >>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> >>>> wrote: >>>> > That commit solves 2 problems: >>>> > >>>> > Adds the pubsub Java deps so that they’re available in our portable >>>> pipeline >>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage, >>>> available as a standard coder. This is required because both PubsubIO.Read >>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage >>>> objects, but only “standard” (i.e. portable) coders can be used, so we have >>>> to hack it to make PubsubMessage appear as a standard coder. >>>> > >>>> > More details: >>>> > >>>> > There’s a similar magic commit required for Kafka external transforms >>>> > The Jira issue for this problem is here: >>>> https://jira.apache.org/jira/browse/BEAM-7870 >>>> > For problem #2 above there seems to be some consensus forming around >>>> using Avro or schema/row coders to send compound types in a portable way. >>>> Here’s the PR for making row coders portable >>>> > https://github.com/apache/beam/pull/9188 >>>> >>>> +1. Note that this doesn't mean that the IO itself must produce rows; >>>> part of the Schema work in Java is to make it easy to automatically >>>> convert from various Java classes to schemas transparently, so this >>>> same logic that would allow one to apply an SQL filter directly to a >>>> Kafka/PubSub read would allow cross-language. Even if that doesn't >>>> work, we need not uglify the Java API; we can have an >>>> option/alternative transform that appends the convert-to-Row DoFn for >>>> easier use by external (though the goal of the former work is to make >>>> this step unnecissary). >>>> >>> >>> Updating all IO connectors / transforms to have a version that >>> produces/consumes a PCollection<Row> is infeasible so I agree that we need >>> an automatic conversion to/from PCollection<Row> possibly by injecting >>> PTransfroms during ExternalTransform expansion. >>> >>>> >>>> > I don’t really have any ideas for problem #1 >>>> >>>> The crux of the issue here is that the jobs API was not designed with >>>> cross-language in mind, and so the artifact API ties artifacts to jobs >>>> rather than to environments. To solve this we need to augment the >>>> notion of environment to allow the specification of additional >>>> dependencies (e.g. jar files in this specific case, or better as >>>> maven/pypi/... dependencies (with version ranges) such that >>>> environment merging and dependency resolution can be sanely done), and >>>> a way for the expansion service to provide such dependencies. >>>> >>>> Max wrote up a summary of the prior discussions at >>>> >>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 >>>> >>>> In the short term, one can build a custom docker image that has all >>>> the requisite dependencies installed. >>>> >>>> This touches on a related but separable issue that one may want to run >>>> some of these transforms "natively" in the same process as the runner >>>> (e.g. a Java IO in the Flink Java Runner) rather than via docker. >>>> (Similarly with subprocess.) Exactly how that works with environment >>>> specifications is also a bit TBD, but my proposal has been that these >>>> are best viewed as runner-specific substitutions of standard >>>> environments. >>>> >>> >>> We need a permanent solution for this but for now we have a temporary >>> solution where additional jar files can be specified through an experiment >>> when running a Python pipeline: >>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55 >>> >>> Thanks, >>> Cham >>> >>> >>>> >>>> > So the portability expansion system works, and now it’s time to sand >>>> off some of the rough corners. I’d love to hear others’ thoughts on how to >>>> resolve some of these remaining issues. >>>> >>>> +1 >>>> >>>> >>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> >>>> wrote: >>>> > >>>> > Hi all, >>>> > There was some interest in this topic at the Beam Summit this week >>>> (btw, great job to everyone involved!), so I thought I’d try to summarize >>>> the current state of things. >>>> > First, let me explain the idea behind an external transforms for the >>>> uninitiated. >>>> > >>>> > Problem: >>>> > >>>> > there’s a transform that you want to use, but it’s not available in >>>> your desired language. IO connectors are a good example: there are many >>>> available in the Java SDK, but not so much in Python or Go. >>>> > >>>> > Solution: >>>> > >>>> > Create a stub transform in your desired language (e.g. Python) whose >>>> primary role is to serialize the parameters passed to that transform >>>> > When you run your portable pipeline, just prior to it being sent to >>>> the Job Service for execution, your stub transform’s payload is first sent >>>> to the “Expansion Service” that’s running in the native language (Java), >>>> where the payload is used to construct an instance of the native transform, >>>> which is then expanded and converted to a protobuf and sent back to the >>>> calling process (Python). >>>> > The protobuf representation of the expanded transform gets integrated >>>> back into the pipeline that you’re submitting >>>> > Steps 2-3 are repeated for each external transform in your pipeline >>>> > Then the whole pipeline gets sent to the Job Service to be invoked on >>>> Flink/Spark/etc >>>> > >>>> > ________________________________ >>>> > >>>> > Now on to my journey to get PubsubIO working in python on Flink. >>>> > >>>> > The first issue I encountered was that there was a lot of boilerplate >>>> involved in serializing the stub python transform’s parameters so they can >>>> be sent to the expansion service. >>>> > >>>> > I created a PR to make this simpler, which has just been merged to >>>> master: https://github.com/apache/beam/pull/9098 >>>> > >>>> > With this feature in place, if you’re using python 3.7 you can use a >>>> dataclass and the typing module to create your transform and describe your >>>> schema in one go. For example: >>>> > >>>> > @dataclasses.dataclass >>>> > class MyAwesomeTransform(beam.ExternalTransform): >>>> > URN = 'beam:external:fakeurn:v1' >>>> > >>>> > integer_example: int >>>> > string_example: str >>>> > list_of_strings: List[str] >>>> > optional_kv: Optional[Tuple[str, float]] = None >>>> > optional_integer: Optional[int] = None >>>> > expansion_service: dataclasses.InitVar[Optional[str]] = None >>>> > >>>> > For earlier versions of python, you can use typing.NamedTuple to >>>> declare your schema. >>>> > >>>> > MyAwesomeSchema = typing.NamedTuple( >>>> > 'MyAwesomeSchema', >>>> > [ >>>> > ('integer_example', int), >>>> > ('string_example', unicode), >>>> > ('list_of_strings', List[unicode]), >>>> > ('optional_kv', Optional[Tuple[unicode, float]]), >>>> > ('optional_integer', Optional[int]), >>>> > ] >>>> > ) >>>> > >>>> > There’s also an option to generate the schema implicitly based on the >>>> value(s) you wish to serialize. >>>> > >>>> > There was a slight tangent in implementing this feature in that >>>> requesting a coder for typing.List resulted in pickle coder instead of >>>> IterableCoder. That’s bad because only standard/portable coders can be used >>>> for expansion in Java (for obvious reasons), so as a convenience that was >>>> solved here: https://github.com/apache/beam/pull/9344 >>>> > >>>> > The next issue that I encountered was that python did not track the >>>> boundedness of PCollections, which made it impossible to use the expansion >>>> service to create unbounded writes. That’s been solved and merged here: >>>> https://github.com/apache/beam/pull/9426 >>>> > >>>> > So that brings us to the actual PR for adding external transform >>>> support for PubsubIO: https://github.com/apache/beam/pull/9268 >>>> > >>>> > The PR works, but with one big caveat: in order to use it you must >>>> build your Java containers with this special commit: >>>> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870 >>>> > >>>> > That commit solves 2 problems: >>>> > >>>> > Adds the pubsub Java deps so that they’re available in our portable >>>> pipeline >>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage, >>>> available as a standard coder. This is required because both PubsubIO.Read >>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage >>>> objects, but only “standard” (i.e. portable) coders can be used, so we have >>>> to hack it to make PubsubMessage appear as a standard coder. >>>> > >>>> > More details: >>>> > >>>> > There’s a similar magic commit required for Kafka external transforms >>>> > The Jira issue for this problem is here: >>>> https://jira.apache.org/jira/browse/BEAM-7870 >>>> > For problem #2 above there seems to be some consensus forming around >>>> using Avro or schema/row coders to send compound types in a portable way. >>>> Here’s the PR for making row coders portable >>>> > https://github.com/apache/beam/pull/9188 >>>> > I don’t really have any ideas for problem #1 >>>> > >>>> > So the portability expansion system works, and now it’s time to sand >>>> off some of the rough corners. I’d love to hear others’ thoughts on how to >>>> resolve some of these remaining issues. >>>> > >>>> > -chad >>>> >>>