On Fri, Feb 1, 2019 at 5:42 PM Thomas Weise <t...@apache.org> wrote: > > On Fri, Feb 1, 2019 at 6:17 AM Maximilian Michels <m...@apache.org> wrote: > >> > Max thanks for your summary. I would like to add that we agree that >> > the runner specific translation via URN is a temporal solution until >> > the wrappers transforms are written, is this correct? In any case this >> > alternative standard expansion approach deserves a discussion of their >> > own as you mention. >> >> Correct. Wrapping existing Beam transforms should always be preferred >> over Runner-specific translation because the latter is not portable. >> >> > From a Python user perspective, this can still be exposed as a stub, > without having to know about the URN. >
Yep. In the long run, I'd expect many sources to be offered as their own easy-to-use stubs. > Also, isn't how we expose this is orthogonal to how it is being translated? > Yes. > It may even be possible to switch the stub to SDF based translation once > that is ready. > Yep. The expansion would change, but that's all an internal detail iside the composite the user doesn't care about. > > >> On 01.02.19 14:25, Ismaël Mejía wrote: >> > Thanks for the explanation Robert it makes much more sense now. (Sorry >> > for the confusion in the mapping I mistyped the direction SDF <-> >> > Source). >> > >> > Status of SDF: >> > - Support for Dynamic Work Rebalancing is WIP. >> > - Bounded version translation is supported by all non-portable runners >> > in a relatively naive way. >> > - Unbounded version translation is not supported in the non-portable >> > runners. (Let's not forget that this case may make sense too). >> > - Portable runners translation of SDF is WIP >> > - There is only one IO that is written based on SDF: >> > - HBaseIO >> > - Some other IOs should work out of the box (those based on >> > non-splittable DoFn): >> > - ClickhouseIO >> > - File-based ones: TextIO, AvroIO, ParquetIO >> > - JdbcIO >> > - SolrIO >> > >> > Max thanks for your summary. I would like to add that we agree that >> > the runner specific translation via URN is a temporal solution until >> > the wrappers transforms are written, is this correct? In any case this >> > alternative standard expansion approach deserves a discussion of their >> > own as you mention. >> > >> > On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >> >> >> Are you suggesting something akin to a generic >> >> >> >> urn: JsonConfiguredJavaSource >> >> payload: some json specifying which source and which parameters >> >> >> >> which would expand to actually constructing and applying that source? >> >> >> >> (FWIW, I was imagining PubSubIO already had a translation into >> BeamFnApi protos that fully specified it, and we use that same format to >> translate back out.) >> >> >> >> On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <m...@apache.org> >> wrote: >> >>> >> >>> Recaping here: >> >>> >> >>> We all agree that SDF is the way to go for future implementations of >> >>> sources. It enables us to get rid of the source interfaces. However, >> SDF >> >>> does not solve the lack of streaming sources in Python. >> >>> >> >>> The expansion PR (thanks btw!) solves the problem of >> >>> expanding/translating URNs known to an ExpansionService. That is a >> more >> >>> programmatic way of replacing language-specific performs, instead of >> >>> relying on translators directly in the Runner. >> >>> >> >>> What is unsolved is the configuration of sources from a foreign >> >>> environment. In my opinion this is the most pressing issue for Python >> >>> sources, because what is PubSubIO worth in Python if you cannot >> >>> configure it? >> >>> >> >>> What about this: >> >>> >> >>> I think it is worth adding a JSON configuration option for all >> existing >> >>> Java sources. That way, we could easily configure them as part of the >> >>> expansion request (which would contain a JSON configuration). I'll >> >>> probably fork a thread to discuss this in more detail, but would like >> to >> >>> hear your thoughts. >> >>> >> >>> -Max >> >>> >> >>> On 01.02.19 13:08, Robert Bradshaw wrote: >> >>>> On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <m...@apache.org >> >>>> <mailto:m...@apache.org>> wrote: >> >>>> >> >>>> Ah, I thought you meant native Flink transforms. >> >>>> >> >>>> Exactly! The translation code is already there. The main >> challenge >> >>>> is how to >> >>>> programmatically configure the BeamIO from Python. I suppose >> that is >> >>>> also an >> >>>> unsolved problem for cross-language transforms in general. >> >>>> >> >>>> >> >>>> This is what https://github.com/apache/beam/pull/7316 does. >> >>>> >> >>>> For a particular source, one would want to define a URN and >> >>>> corresponding payload, then (probably) a CompositeTransform in Python >> >>>> that takes the users arguments, packages them into the payload, >> applies >> >>>> the ExternalTransform, and returns the results. How to handle >> arbitrary >> >>>> UDFs embedded in sources is still TBD. >> >>>> >> >>>> For Matthias' pipeline with PubSubIO we can build something >> >>>> specific, but for >> >>>> the general case there should be way to initialize a Beam IO >> via a >> >>>> configuration >> >>>> map provided by an external environment. >> >>>> >> >>>> >> >>>> I thought quite a bit about how we could represent expansions >> statically >> >>>> (e.g. have some kind of expansion template that could be used, at >> least >> >>>> in many cases, as data without firing up a separate process. May be >> >>>> worth doing eventually, but we run into the same issues that were >> >>>> discussed at >> >>>> https://github.com/apache/beam/pull/7316#discussion_r249996455 ). >> >>>> >> >>>> If one is already using a portable runner like Flink, having the job >> >>>> service process automatically also serve up an expansion service for >> >>>> various URNs it knows and cares about is probably a pretty low bar. >> >>>> Flink could serve up things it would rather get back untouched in a >> >>>> transform with a special flink runner urn. >> >>>> >> >>>> As Ahmet mentions, SDF is better solution. I hope it's not that far >> >>>> away, but even once it comes we'll likely want the above framework to >> >>>> invoke the full suite of Java IOs even after they're running on SDF >> >>>> themselves. >> >>>> >> >>>> - Robert >> >>>> >> >>>> On 31.01.19 17:36, Thomas Weise wrote: >> >>>> > Exactly, that's what I had in mind. >> >>>> > >> >>>> > A Flink runner native transform would make the existing >> unbounded >> >>>> sources >> >>>> > available, similar to: >> >>>> > >> >>>> > >> >>>> >> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167 >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels >> >>>> <m...@apache.org <mailto:m...@apache.org> >> >>>> > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: >> >>>> > >> >>>> > Wouldn't it be even more useful for the transition >> period if >> >>>> we enabled Beam IO >> >>>> > to be used via Flink (like in the legacy Flink Runner)? >> In >> >>>> this particular >> >>>> > example, Matthias wants to use PubSubIO, which is not >> even >> >>>> available as a >> >>>> > native >> >>>> > Flink transform. >> >>>> > >> >>>> > On 31.01.19 16:21, Thomas Weise wrote: >> >>>> > > Until SDF is supported, we could also add Flink runner >> >>>> native transforms for >> >>>> > > selected unbounded sources [1]. >> >>>> > > >> >>>> > > That might be a reasonable option to unblock users >> that >> >>>> want to try Python >> >>>> > > streaming on Flink. >> >>>> > > >> >>>> > > Thomas >> >>>> > > >> >>>> > > [1] >> >>>> > > >> >>>> > >> >>>> >> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java >> >>>> > > >> >>>> > > >> >>>> > > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels >> >>>> <m...@apache.org <mailto:m...@apache.org> >> >>>> > <mailto:m...@apache.org <mailto:m...@apache.org>> >> >>>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >> >>>> <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: >> >>>> > > >> >>>> > > > I have a hard time to imagine how can we map >> in a >> >>>> generic way >> >>>> > > RestrictionTrackers into the existing >> >>>> Bounded/UnboundedSource, so I would >> >>>> > > love to hear more about the details. >> >>>> > > >> >>>> > > Isn't it the other way around? The SDF is a >> >>>> generalization of >> >>>> > UnboundedSource. >> >>>> > > So we would wrap UnboundedSource using SDF. I'm >> not >> >>>> saying it is >> >>>> > trivial, but >> >>>> > > SDF offers all the functionality that >> UnboundedSource >> >>>> needs. >> >>>> > > >> >>>> > > For example, the @GetInitialRestriction method >> would >> >>>> call split on the >> >>>> > > UnboundedSource and the restriction trackers would >> >>>> then be used to >> >>>> > process the >> >>>> > > splits. >> >>>> > > >> >>>> > > On 31.01.19 15:16, Ismaël Mejía wrote: >> >>>> > > >> Not necessarily. This would be one way. >> Another >> >>>> way is build an SDF >> >>>> > > wrapper for UnboundedSource. Probably the easier >> path >> >>>> for migration. >> >>>> > > > >> >>>> > > > That would be fantastic, I have heard about >> such >> >>>> wrapper multiple >> >>>> > > > times but so far there is not any realistic >> >>>> proposal. I have a hard >> >>>> > > > time to imagine how can we map in a generic way >> >>>> RestrictionTrackers >> >>>> > > > into the existing Bounded/UnboundedSource, so I >> >>>> would love to hear >> >>>> > > > more about the details. >> >>>> > > > >> >>>> > > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian >> Michels >> >>>> <m...@apache.org <mailto:m...@apache.org> >> >>>> > <mailto:m...@apache.org <mailto:m...@apache.org>> >> >>>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >> >>>> <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: >> >>>> > > >> >> >>>> > > >> > In addition to have support in the >> runners, >> >>>> this will require a >> >>>> > > >> > rewrite of PubsubIO to use the new SDF >> API. >> >>>> > > >> >> >>>> > > >> Not necessarily. This would be one way. >> Another >> >>>> way is build an SDF >> >>>> > > wrapper for >> >>>> > > >> UnboundedSource. Probably the easier path for >> >>>> migration. >> >>>> > > >> >> >>>> > > >> On 31.01.19 14:03, Ismaël Mejía wrote: >> >>>> > > >>>> Fortunately, there is already a pending PR >> for >> >>>> cross-language >> >>>> > > pipelines which >> >>>> > > >>>> will allow us to use Java IO like PubSub in >> >>>> Python jobs. >> >>>> > > >>> >> >>>> > > >>> In addition to have support in the runners, >> this >> >>>> will require a >> >>>> > > >>> rewrite of PubsubIO to use the new SDF API. >> >>>> > > >>> >> >>>> > > >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian >> Michels >> >>>> > <m...@apache.org <mailto:m...@apache.org> >> >>>> <mailto:m...@apache.org <mailto:m...@apache.org>> >> >>>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >> >>>> <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: >> >>>> > > >>>> >> >>>> > > >>>> Hi Matthias, >> >>>> > > >>>> >> >>>> > > >>>> This is already reflected in the >> compatibility >> >>>> matrix, if you look >> >>>> > > under SDF. >> >>>> > > >>>> There is no UnboundedSource interface for >> >>>> portable pipelines. >> >>>> > That's a >> >>>> > > legacy >> >>>> > > >>>> abstraction that will be replaced with SDF. >> >>>> > > >>>> >> >>>> > > >>>> Fortunately, there is already a pending PR >> for >> >>>> cross-language >> >>>> > > pipelines which >> >>>> > > >>>> will allow us to use Java IO like PubSub in >> >>>> Python jobs. >> >>>> > > >>>> >> >>>> > > >>>> Thanks, >> >>>> > > >>>> Max >> >>>> > > >>>> >> >>>> > > >>>> On 31.01.19 12:06, Matthias Baetens wrote: >> >>>> > > >>>>> Hey Ankur, >> >>>> > > >>>>> >> >>>> > > >>>>> Thanks for the swift reply. Should I change >> >>>> this in the >> >>>> > capability matrix >> >>>> > > >>>>> >> >>>> <https://s.apache.org/apache-beam-portability-support-table> >> then? >> >>>> > > >>>>> >> >>>> > > >>>>> Many thanks. >> >>>> > > >>>>> Best, >> >>>> > > >>>>> Matthias >> >>>> > > >>>>> >> >>>> > > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka >> >>>> <goe...@google.com <mailto:goe...@google.com> >> >>>> > <mailto:goe...@google.com <mailto:goe...@google.com>> >> >>>> > > <mailto:goe...@google.com <mailto: >> goe...@google.com> >> >>>> <mailto:goe...@google.com <mailto:goe...@google.com>>> >> >>>> > > >>>>> <mailto:goe...@google.com >> >>>> <mailto:goe...@google.com> <mailto:goe...@google.com >> >>>> <mailto:goe...@google.com>> >> >>>> > <mailto:goe...@google.com <mailto:goe...@google.com> >> >>>> <mailto:goe...@google.com <mailto:goe...@google.com>>>>> wrote: >> >>>> > > >>>>> >> >>>> > > >>>>> Hi Matthias, >> >>>> > > >>>>> >> >>>> > > >>>>> Unfortunately, unbounded reads >> including >> >>>> pubsub are not yet >> >>>> > > supported for >> >>>> > > >>>>> portable runners. >> >>>> > > >>>>> >> >>>> > > >>>>> Thanks, >> >>>> > > >>>>> Ankur >> >>>> > > >>>>> >> >>>> > > >>>>> On Thu, Jan 31, 2019 at 2:44 PM >> Matthias >> >>>> Baetens >> >>>> > > <baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com> <mailto: >> baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com>> >> >>>> > <mailto:baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com> <mailto: >> baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com>>> >> >>>> > > >>>>> <mailto:baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com> >> >>>> > <mailto:baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com>> >> >>>> > > <mailto:baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com> >> >>>> > <mailto:baetensmatth...@gmail.com >> >>>> <mailto:baetensmatth...@gmail.com>>>>> wrote: >> >>>> > > >>>>> >> >>>> > > >>>>> Hi everyone, >> >>>> > > >>>>> >> >>>> > > >>>>> Last few days I have been trying >> to >> >>>> run a streaming >> >>>> > > pipeline (code on >> >>>> > > >>>>> Github >> >>>> <https://github.com/matthiasa4/beam-demo>) on a >> >>>> > > Flink Runner. >> >>>> > > >>>>> >> >>>> > > >>>>> I am running a Flink cluster >> locally >> >>>> (v1.5.6 >> >>>> > > >>>>> >> >>>> <https://flink.apache.org/downloads.html>) >> >>>> > > >>>>> I have built the SDK Harness >> >>>> Container: /./gradlew >> >>>> > > >>>>> >> :beam-sdks-python-container:docker/ >> >>>> > > >>>>> and started the JobServer: >> /./gradlew >> >>>> > > >>>>> >> >>>> :beam-runners-flink_2.11-job-server:runShadow >> >>>> > > >>>>> -PflinkMasterUrl=localhost:8081./ >> >>>> > > >>>>> >> >>>> > > >>>>> I run my pipeline with: >> /env/bin/python >> >>>> > streaming_pipeline.py >> >>>> > > >>>>> --runner=PortableRunner >> >>>> --job_endpoint=localhost:8099 >> >>>> > > --output xxx >> >>>> > > >>>>> --input_subscription xxx >> >>>> --output_subscription xxx/ >> >>>> > > >>>>> / >> >>>> > > >>>>> / >> >>>> > > >>>>> All this is running inside a >> Ubuntu >> >>>> (Bionic) in a >> >>>> > Virtualbox. >> >>>> > > >>>>> >> >>>> > > >>>>> The job submits fine, but >> >>>> unfortunately fails after >> >>>> > a few >> >>>> > > seconds with >> >>>> > > >>>>> the error attached. >> >>>> > > >>>>> >> >>>> > > >>>>> Anything I am missing or doing >> wrong? >> >>>> > > >>>>> >> >>>> > > >>>>> Many thanks. >> >>>> > > >>>>> Best, >> >>>> > > >>>>> Matthias >> >>>> > > >>>>> >> >>>> > > >>>>> >> >>>> > > >> >>>> > >> >>>> >> >