On Thu, Jan 24, 2019 at 6:43 PM Reuven Lax <re...@google.com> wrote: > > Keep in mind that these user-supplied lambdas are commonly used in our IOs. > One common usage is in Sink IOs, to allow dynamic destinations. e.g. in > BigQueryIO.Write, a user-supplied lambda determines what table a record > should be written to.
This can probably be pre-computed upstream (as part of the wrapping composite that does take a language-native lamdba) and placed in a standard format (e.g. a tuple or other schema) to be extracted by the "core" sink. > Given that IOs are one of the big selling points of cross-language support, > we should think about how we can support this functionality. Yes. There are user-supplied lambdas that can't be as easily pre- or post-applied, and though we had some brainstorming sessions (~ a year ago) we're far from a (good) answer to that. > On Thu, Jan 24, 2019 at 8:34 AM Robert Bradshaw <rober...@google.com> wrote: >> >> On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise <t...@apache.org> wrote: >> > >> > Exciting to see the cross-language train gathering steam :) >> > >> > It may be useful to flesh out the user facing aspects a bit more before >> > going too deep on the service / expansion side or maybe that was done >> > elsewhere? >> >> It's been discussed, but no resolution yet. >> >> > A few examples (of varying complexity) of how the shim/proxy transforms >> > would look like in the other SDKs. Perhaps Java KafkaIO in Python and Go >> > would be a good candidate? >> >> The core implementation would, almost by definition, be >> >> input.apply(ExternalTransform(URN, payload, service_address). >> >> Nicer shims would just be composite transforms that call this, filling >> in the URNs, payloads, and possibly service details from more >> user-friendly parameters. >> >> > One problem we discovered with custom Flink native transforms for Python >> > was handling of lambdas / functions. An example could be a user defined >> > watermark timestamp extractor that the user should be able to supply in >> > Python and the JVM cannot handle. >> >> Yes, this has never been resolved satisfactorily. For now, if UDFs can >> be reified in terms of a commonly-understood URN + payload, it'll >> work. A transform could provide a wide range of "useful" URNs for its >> internal callbacks, more than that would require significant design if >> it can't be pre- or post-fixed. >> >> > On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath <chamik...@google.com> >> > wrote: >> >> >> >> >> >> >> >> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw <rober...@google.com> >> >> wrote: >> >>> >> >>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels <m...@apache.org> >> >>> wrote: >> >>> > >> >>> > Thank you for starting on the cross-language feature Robert! >> >>> > >> >>> > Just to recap: Each SDK runs an ExpansionService which can be >> >>> > contacted during >> >>> > pipeline translation to expand transforms that are unknown to the SDK. >> >>> > The >> >>> > service returns the Proto definitions to the querying process. >> >>> >> >>> Yep. Technically it doesn't have to be the SDK, or even if it is there >> >>> may be a variety of services (e.g. one offering SQL, one offering >> >>> different IOs). >> >>> >> >>> > There will be multiple environments such that during execution >> >>> > cross-language >> >>> > pipelines select the appropriate environment for a transform. >> >>> >> >>> Exactly. And fuses only those steps with compatible environments >> >>> together. >> >>> >> >>> > It's not clear to me, should the expansion happen during pipeline >> >>> > construction >> >>> > or during translation by the Runner? >> >>> >> >>> I think it need to happen as part of construction because the set of >> >>> outputs (and their properties) can be dynamic based on the expansion. >> >> >> >> >> >> Also, without expansion at pipeline construction, we'll have to define >> >> all composite cross-language transforms as runner-native transforms which >> >> won't be practical ? >> >> >> >>> >> >>> >> >>> > Thanks, >> >>> > Max >> >>> > >> >>> > On 23.01.19 04:12, Robert Bradshaw wrote: >> >>> > > No, this PR simply takes an endpoint address as a parameter, >> >>> > > expecting >> >>> > > it to already be up and available. More convenient APIs, e.g. ones >> >>> > > that spin up and endpoint and tear it down, or catalog and locate >> >>> > > code >> >>> > > and services offering these endpoints, could be provided as wrappers >> >>> > > on top of or extensions of this. >> >>> > > >> >>> > > On Wed, Jan 23, 2019 at 12:19 AM Kenneth Knowles <k...@apache.org> >> >>> > > wrote: >> >>> > >> >> >>> > >> Nice! If I recall correctly, there was mostly concern about how to >> >>> > >> launch and manage the expansion service (Docker? Vendor-specific? >> >>> > >> Etc). Does this PR a position on that question? >> >>> > >> >> >>> > >> Kenn >> >>> > >> >> >>> > >> On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath >> >>> > >> <chamik...@google.com> wrote: >> >>> > >>> >> >>> > >>> >> >>> > >>> >> >>> > >>> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri <eh...@google.com> >> >>> > >>> wrote: >> >>> > >>>> >> >>> > >>>> Also debugability: collecting logs from each of these systems. >> >>> > >>> >> >>> > >>> >> >>> > >>> Agree. >> >>> > >>> >> >>> > >>>> >> >>> > >>>> >> >>> > >>>> On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath >> >>> > >>>> <chamik...@google.com> wrote: >> >>> > >>>>> >> >>> > >>>>> Thanks Robert. >> >>> > >>>>> >> >>> > >>>>> On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw >> >>> > >>>>> <rober...@google.com> wrote: >> >>> > >>>>>> >> >>> > >>>>>> Now that we have the FnAPI, I started playing around with >> >>> > >>>>>> support for >> >>> > >>>>>> cross-language pipelines. This will allow things like IOs to be >> >>> > >>>>>> shared >> >>> > >>>>>> across all languages, SQL to be invoked from non-Java, TFX >> >>> > >>>>>> tensorflow >> >>> > >>>>>> transforms to be invoked from non-Python, etc. and I think is >> >>> > >>>>>> the next >> >>> > >>>>>> step in extending (and taking advantage of) the portability >> >>> > >>>>>> layer >> >>> > >>>>>> we've developed. These are often composite transforms whose >> >>> > >>>>>> inner >> >>> > >>>>>> structure depends in non-trivial ways on their configuration. >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> Some additional benefits of cross-language transforms are given >> >>> > >>>>> below. >> >>> > >>>>> >> >>> > >>>>> (1) Current large collection of Java IO connectors will be >> >>> > >>>>> become available to other languages. >> >>> > >>>>> (2) Current Java and Python transforms will be available for Go >> >>> > >>>>> and any other future SDKs. >> >>> > >>>>> (3) New transform authors will be able to pick their language of >> >>> > >>>>> choice and make their transform available to all Beam SDKs. For >> >>> > >>>>> example, this can be the language the transform author is most >> >>> > >>>>> familiar with or the only language for which a client library is >> >>> > >>>>> available for connecting to an external data store. >> >>> > >>>>> >> >>> > >>>>>> >> >>> > >>>>>> I created a PR [1] that basically follows the "expand via an >> >>> > >>>>>> external >> >>> > >>>>>> process" over RPC alternative from the proposals we came up >> >>> > >>>>>> with when >> >>> > >>>>>> we were discussing this last time [2]. There are still some >> >>> > >>>>>> unknowns, >> >>> > >>>>>> e.g. how to handle artifacts supplied by an alternative SDK >> >>> > >>>>>> (they >> >>> > >>>>>> currently must be provided by the environment), but I think >> >>> > >>>>>> this is a >> >>> > >>>>>> good incremental step forward that will already be useful in a >> >>> > >>>>>> large >> >>> > >>>>>> number of cases. It would be good to validate the general >> >>> > >>>>>> direction >> >>> > >>>>>> and I would be interested in any feedback others may have on it. >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> I think there are multiple semi-dependent problems we have to >> >>> > >>>>> tackle to reach the final goal of supporting fully-fledged >> >>> > >>>>> cross-language transforms in Beam. I agree with taking an >> >>> > >>>>> incremental approach here with overall vision in mind. Some >> >>> > >>>>> other problems we have to tackle involve following. >> >>> > >>>>> >> >>> > >>>>> * Defining a user API that will allow pipelines defined in a SDK >> >>> > >>>>> X to use transforms defined in SDK Y. >> >>> > >>>>> * Update various runners to use URN/payload based environment >> >>> > >>>>> definition [1] >> >>> > >>>>> * Updating various runners to support starting containers for >> >>> > >>>>> multiple environments/languages for the same pipeline and >> >>> > >>>>> supporting executing pipeline steps in containers started for >> >>> > >>>>> multiple environments. >> >>> > >>> >> >>> > >>> >> >>> > >>> I've been working with +Heejong Lee to add some of the missing >> >>> > >>> pieces mentioned above. >> >>> > >>> >> >>> > >>> We created following doc that captures some of the ongoing work >> >>> > >>> related to cross-language transforms and which will hopefully >> >>> > >>> serve as a knowledge base for anybody who wish to quickly learn >> >>> > >>> context related to this. >> >>> > >>> Feel free to refer to this and/or add to this. >> >>> > >>> >> >>> > >>> https://docs.google.com/document/d/1H3yCyVFI9xYs1jsiF1GfrDtARgWGnLDEMwG5aQIx2AU/edit?usp=sharing >> >>> > >>> >> >>> > >>> >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> Thanks, >> >>> > >>>>> Cham >> >>> > >>>>> >> >>> > >>>>> [1] >> >>> > >>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L952 >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>> >> >>> > >>>>>> >> >>> > >>>>>> >> >>> > >>>>>> - Robert >> >>> > >>>>>> >> >>> > >>>>>> [1] https://github.com/apache/beam/pull/7316 >> >>> > >>>>>> [2] https://s.apache.org/beam-mixed-language-pipelines