This thread was very helpful to find more detail in https://jira.apache.org/jira/browse/BEAM-7870
It would be great to have cross-language current state mentioned as top level entry on https://beam.apache.org/roadmap/ On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath <chamik...@google.com> wrote: > Thanks for the nice write up Chad. > > On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <rober...@google.com> > wrote: > >> Thanks for bringing this up again. My thoughts on the open questions >> below. >> >> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote: >> > That commit solves 2 problems: >> > >> > Adds the pubsub Java deps so that they’re available in our portable >> pipeline >> > Makes the coder for the PubsubIO message-holder type, PubsubMessage, >> available as a standard coder. This is required because both PubsubIO.Read >> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage >> objects, but only “standard” (i.e. portable) coders can be used, so we have >> to hack it to make PubsubMessage appear as a standard coder. >> > >> > More details: >> > >> > There’s a similar magic commit required for Kafka external transforms >> > The Jira issue for this problem is here: >> https://jira.apache.org/jira/browse/BEAM-7870 >> > For problem #2 above there seems to be some consensus forming around >> using Avro or schema/row coders to send compound types in a portable way. >> Here’s the PR for making row coders portable >> > https://github.com/apache/beam/pull/9188 >> >> +1. Note that this doesn't mean that the IO itself must produce rows; >> part of the Schema work in Java is to make it easy to automatically >> convert from various Java classes to schemas transparently, so this >> same logic that would allow one to apply an SQL filter directly to a >> Kafka/PubSub read would allow cross-language. Even if that doesn't >> work, we need not uglify the Java API; we can have an >> option/alternative transform that appends the convert-to-Row DoFn for >> easier use by external (though the goal of the former work is to make >> this step unnecissary). >> > > Updating all IO connectors / transforms to have a version that > produces/consumes a PCollection<Row> is infeasible so I agree that we need > an automatic conversion to/from PCollection<Row> possibly by injecting > PTransfroms during ExternalTransform expansion. > >> >> > I don’t really have any ideas for problem #1 >> >> The crux of the issue here is that the jobs API was not designed with >> cross-language in mind, and so the artifact API ties artifacts to jobs >> rather than to environments. To solve this we need to augment the >> notion of environment to allow the specification of additional >> dependencies (e.g. jar files in this specific case, or better as >> maven/pypi/... dependencies (with version ranges) such that >> environment merging and dependency resolution can be sanely done), and >> a way for the expansion service to provide such dependencies. >> >> Max wrote up a summary of the prior discussions at >> >> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 >> >> In the short term, one can build a custom docker image that has all >> the requisite dependencies installed. >> >> This touches on a related but separable issue that one may want to run >> some of these transforms "natively" in the same process as the runner >> (e.g. a Java IO in the Flink Java Runner) rather than via docker. >> (Similarly with subprocess.) Exactly how that works with environment >> specifications is also a bit TBD, but my proposal has been that these >> are best viewed as runner-specific substitutions of standard >> environments. >> > > We need a permanent solution for this but for now we have a temporary > solution where additional jar files can be specified through an experiment > when running a Python pipeline: > https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55 > > Thanks, > Cham > > >> >> > So the portability expansion system works, and now it’s time to sand >> off some of the rough corners. I’d love to hear others’ thoughts on how to >> resolve some of these remaining issues. >> >> +1 >> >> >> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote: >> > >> > Hi all, >> > There was some interest in this topic at the Beam Summit this week >> (btw, great job to everyone involved!), so I thought I’d try to summarize >> the current state of things. >> > First, let me explain the idea behind an external transforms for the >> uninitiated. >> > >> > Problem: >> > >> > there’s a transform that you want to use, but it’s not available in >> your desired language. IO connectors are a good example: there are many >> available in the Java SDK, but not so much in Python or Go. >> > >> > Solution: >> > >> > Create a stub transform in your desired language (e.g. Python) whose >> primary role is to serialize the parameters passed to that transform >> > When you run your portable pipeline, just prior to it being sent to the >> Job Service for execution, your stub transform’s payload is first sent to >> the “Expansion Service” that’s running in the native language (Java), where >> the payload is used to construct an instance of the native transform, which >> is then expanded and converted to a protobuf and sent back to the calling >> process (Python). >> > The protobuf representation of the expanded transform gets integrated >> back into the pipeline that you’re submitting >> > Steps 2-3 are repeated for each external transform in your pipeline >> > Then the whole pipeline gets sent to the Job Service to be invoked on >> Flink/Spark/etc >> > >> > ________________________________ >> > >> > Now on to my journey to get PubsubIO working in python on Flink. >> > >> > The first issue I encountered was that there was a lot of boilerplate >> involved in serializing the stub python transform’s parameters so they can >> be sent to the expansion service. >> > >> > I created a PR to make this simpler, which has just been merged to >> master: https://github.com/apache/beam/pull/9098 >> > >> > With this feature in place, if you’re using python 3.7 you can use a >> dataclass and the typing module to create your transform and describe your >> schema in one go. For example: >> > >> > @dataclasses.dataclass >> > class MyAwesomeTransform(beam.ExternalTransform): >> > URN = 'beam:external:fakeurn:v1' >> > >> > integer_example: int >> > string_example: str >> > list_of_strings: List[str] >> > optional_kv: Optional[Tuple[str, float]] = None >> > optional_integer: Optional[int] = None >> > expansion_service: dataclasses.InitVar[Optional[str]] = None >> > >> > For earlier versions of python, you can use typing.NamedTuple to >> declare your schema. >> > >> > MyAwesomeSchema = typing.NamedTuple( >> > 'MyAwesomeSchema', >> > [ >> > ('integer_example', int), >> > ('string_example', unicode), >> > ('list_of_strings', List[unicode]), >> > ('optional_kv', Optional[Tuple[unicode, float]]), >> > ('optional_integer', Optional[int]), >> > ] >> > ) >> > >> > There’s also an option to generate the schema implicitly based on the >> value(s) you wish to serialize. >> > >> > There was a slight tangent in implementing this feature in that >> requesting a coder for typing.List resulted in pickle coder instead of >> IterableCoder. That’s bad because only standard/portable coders can be used >> for expansion in Java (for obvious reasons), so as a convenience that was >> solved here: https://github.com/apache/beam/pull/9344 >> > >> > The next issue that I encountered was that python did not track the >> boundedness of PCollections, which made it impossible to use the expansion >> service to create unbounded writes. That’s been solved and merged here: >> https://github.com/apache/beam/pull/9426 >> > >> > So that brings us to the actual PR for adding external transform >> support for PubsubIO: https://github.com/apache/beam/pull/9268 >> > >> > The PR works, but with one big caveat: in order to use it you must >> build your Java containers with this special commit: >> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870 >> > >> > That commit solves 2 problems: >> > >> > Adds the pubsub Java deps so that they’re available in our portable >> pipeline >> > Makes the coder for the PubsubIO message-holder type, PubsubMessage, >> available as a standard coder. This is required because both PubsubIO.Read >> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage >> objects, but only “standard” (i.e. portable) coders can be used, so we have >> to hack it to make PubsubMessage appear as a standard coder. >> > >> > More details: >> > >> > There’s a similar magic commit required for Kafka external transforms >> > The Jira issue for this problem is here: >> https://jira.apache.org/jira/browse/BEAM-7870 >> > For problem #2 above there seems to be some consensus forming around >> using Avro or schema/row coders to send compound types in a portable way. >> Here’s the PR for making row coders portable >> > https://github.com/apache/beam/pull/9188 >> > I don’t really have any ideas for problem #1 >> > >> > So the portability expansion system works, and now it’s time to sand >> off some of the rough corners. I’d love to hear others’ thoughts on how to >> resolve some of these remaining issues. >> > >> > -chad >> >