Thanks for the explanation Robert it makes much more sense now. (Sorry for the confusion in the mapping I mistyped the direction SDF <-> Source).
Status of SDF: - Support for Dynamic Work Rebalancing is WIP. - Bounded version translation is supported by all non-portable runners in a relatively naive way. - Unbounded version translation is not supported in the non-portable runners. (Let's not forget that this case may make sense too). - Portable runners translation of SDF is WIP - There is only one IO that is written based on SDF: - HBaseIO - Some other IOs should work out of the box (those based on non-splittable DoFn): - ClickhouseIO - File-based ones: TextIO, AvroIO, ParquetIO - JdbcIO - SolrIO Max thanks for your summary. I would like to add that we agree that the runner specific translation via URN is a temporal solution until the wrappers transforms are written, is this correct? In any case this alternative standard expansion approach deserves a discussion of their own as you mention. On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw <[email protected]> wrote: > > Are you suggesting something akin to a generic > > urn: JsonConfiguredJavaSource > payload: some json specifying which source and which parameters > > which would expand to actually constructing and applying that source? > > (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi > protos that fully specified it, and we use that same format to translate back > out.) > > On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <[email protected]> wrote: >> >> Recaping here: >> >> We all agree that SDF is the way to go for future implementations of >> sources. It enables us to get rid of the source interfaces. However, SDF >> does not solve the lack of streaming sources in Python. >> >> The expansion PR (thanks btw!) solves the problem of >> expanding/translating URNs known to an ExpansionService. That is a more >> programmatic way of replacing language-specific performs, instead of >> relying on translators directly in the Runner. >> >> What is unsolved is the configuration of sources from a foreign >> environment. In my opinion this is the most pressing issue for Python >> sources, because what is PubSubIO worth in Python if you cannot >> configure it? >> >> What about this: >> >> I think it is worth adding a JSON configuration option for all existing >> Java sources. That way, we could easily configure them as part of the >> expansion request (which would contain a JSON configuration). I'll >> probably fork a thread to discuss this in more detail, but would like to >> hear your thoughts. >> >> -Max >> >> On 01.02.19 13:08, Robert Bradshaw wrote: >> > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Ah, I thought you meant native Flink transforms. >> > >> > Exactly! The translation code is already there. The main challenge >> > is how to >> > programmatically configure the BeamIO from Python. I suppose that is >> > also an >> > unsolved problem for cross-language transforms in general. >> > >> > >> > This is what https://github.com/apache/beam/pull/7316 does. >> > >> > For a particular source, one would want to define a URN and >> > corresponding payload, then (probably) a CompositeTransform in Python >> > that takes the users arguments, packages them into the payload, applies >> > the ExternalTransform, and returns the results. How to handle arbitrary >> > UDFs embedded in sources is still TBD. >> > >> > For Matthias' pipeline with PubSubIO we can build something >> > specific, but for >> > the general case there should be way to initialize a Beam IO via a >> > configuration >> > map provided by an external environment. >> > >> > >> > I thought quite a bit about how we could represent expansions statically >> > (e.g. have some kind of expansion template that could be used, at least >> > in many cases, as data without firing up a separate process. May be >> > worth doing eventually, but we run into the same issues that were >> > discussed at >> > https://github.com/apache/beam/pull/7316#discussion_r249996455 ). >> > >> > If one is already using a portable runner like Flink, having the job >> > service process automatically also serve up an expansion service for >> > various URNs it knows and cares about is probably a pretty low bar. >> > Flink could serve up things it would rather get back untouched in a >> > transform with a special flink runner urn. >> > >> > As Ahmet mentions, SDF is better solution. I hope it's not that far >> > away, but even once it comes we'll likely want the above framework to >> > invoke the full suite of Java IOs even after they're running on SDF >> > themselves. >> > >> > - Robert >> > >> > On 31.01.19 17:36, Thomas Weise wrote: >> > > Exactly, that's what I had in mind. >> > > >> > > A Flink runner native transform would make the existing unbounded >> > sources >> > > available, similar to: >> > > >> > > >> > >> > https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167 >> > > >> > > >> > > >> > > >> > > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: >> > > >> > > Wouldn't it be even more useful for the transition period if >> > we enabled Beam IO >> > > to be used via Flink (like in the legacy Flink Runner)? In >> > this particular >> > > example, Matthias wants to use PubSubIO, which is not even >> > available as a >> > > native >> > > Flink transform. >> > > >> > > On 31.01.19 16:21, Thomas Weise wrote: >> > > > Until SDF is supported, we could also add Flink runner >> > native transforms for >> > > > selected unbounded sources [1]. >> > > > >> > > > That might be a reasonable option to unblock users that >> > want to try Python >> > > > streaming on Flink. >> > > > >> > > > Thomas >> > > > >> > > > [1] >> > > > >> > > >> > >> > https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java >> > > > >> > > > >> > > > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>> >> > > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > > >> > > > > I have a hard time to imagine how can we map in a >> > generic way >> > > > RestrictionTrackers into the existing >> > Bounded/UnboundedSource, so I would >> > > > love to hear more about the details. >> > > > >> > > > Isn't it the other way around? The SDF is a >> > generalization of >> > > UnboundedSource. >> > > > So we would wrap UnboundedSource using SDF. I'm not >> > saying it is >> > > trivial, but >> > > > SDF offers all the functionality that UnboundedSource >> > needs. >> > > > >> > > > For example, the @GetInitialRestriction method would >> > call split on the >> > > > UnboundedSource and the restriction trackers would >> > then be used to >> > > process the >> > > > splits. >> > > > >> > > > On 31.01.19 15:16, Ismaël Mejía wrote: >> > > > >> Not necessarily. This would be one way. Another >> > way is build an SDF >> > > > wrapper for UnboundedSource. Probably the easier path >> > for migration. >> > > > > >> > > > > That would be fantastic, I have heard about such >> > wrapper multiple >> > > > > times but so far there is not any realistic >> > proposal. I have a hard >> > > > > time to imagine how can we map in a generic way >> > RestrictionTrackers >> > > > > into the existing Bounded/UnboundedSource, so I >> > would love to hear >> > > > > more about the details. >> > > > > >> > > > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>> >> > > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > > >> >> > > > >> > In addition to have support in the runners, >> > this will require a >> > > > >> > rewrite of PubsubIO to use the new SDF API. >> > > > >> >> > > > >> Not necessarily. This would be one way. Another >> > way is build an SDF >> > > > wrapper for >> > > > >> UnboundedSource. Probably the easier path for >> > migration. >> > > > >> >> > > > >> On 31.01.19 14:03, Ismaël Mejía wrote: >> > > > >>>> Fortunately, there is already a pending PR for >> > cross-language >> > > > pipelines which >> > > > >>>> will allow us to use Java IO like PubSub in >> > Python jobs. >> > > > >>> >> > > > >>> In addition to have support in the runners, this >> > will require a >> > > > >>> rewrite of PubsubIO to use the new SDF API. >> > > > >>> >> > > > >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels >> > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > > >>>> >> > > > >>>> Hi Matthias, >> > > > >>>> >> > > > >>>> This is already reflected in the compatibility >> > matrix, if you look >> > > > under SDF. >> > > > >>>> There is no UnboundedSource interface for >> > portable pipelines. >> > > That's a >> > > > legacy >> > > > >>>> abstraction that will be replaced with SDF. >> > > > >>>> >> > > > >>>> Fortunately, there is already a pending PR for >> > cross-language >> > > > pipelines which >> > > > >>>> will allow us to use Java IO like PubSub in >> > Python jobs. >> > > > >>>> >> > > > >>>> Thanks, >> > > > >>>> Max >> > > > >>>> >> > > > >>>> On 31.01.19 12:06, Matthias Baetens wrote: >> > > > >>>>> Hey Ankur, >> > > > >>>>> >> > > > >>>>> Thanks for the swift reply. Should I change >> > this in the >> > > capability matrix >> > > > >>>>> >> > <https://s.apache.org/apache-beam-portability-support-table> then? >> > > > >>>>> >> > > > >>>>> Many thanks. >> > > > >>>>> Best, >> > > > >>>>> Matthias >> > > > >>>>> >> > > > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>> >> > > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>> >> > > > >>>>> <mailto:[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote: >> > > > >>>>> >> > > > >>>>> Hi Matthias, >> > > > >>>>> >> > > > >>>>> Unfortunately, unbounded reads including >> > pubsub are not yet >> > > > supported for >> > > > >>>>> portable runners. >> > > > >>>>> >> > > > >>>>> Thanks, >> > > > >>>>> Ankur >> > > > >>>>> >> > > > >>>>> On Thu, Jan 31, 2019 at 2:44 PM Matthias >> > Baetens >> > > > <[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>> >> > > <mailto:[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>>> >> > > > >>>>> <mailto:[email protected] >> > <mailto:[email protected]> >> > > <mailto:[email protected] >> > <mailto:[email protected]>> >> > > > <mailto:[email protected] >> > <mailto:[email protected]> >> > > <mailto:[email protected] >> > <mailto:[email protected]>>>>> wrote: >> > > > >>>>> >> > > > >>>>> Hi everyone, >> > > > >>>>> >> > > > >>>>> Last few days I have been trying to >> > run a streaming >> > > > pipeline (code on >> > > > >>>>> Github >> > <https://github.com/matthiasa4/beam-demo>) on a >> > > > Flink Runner. >> > > > >>>>> >> > > > >>>>> I am running a Flink cluster locally >> > (v1.5.6 >> > > > >>>>> >> > <https://flink.apache.org/downloads.html>) >> > > > >>>>> I have built the SDK Harness >> > Container: /./gradlew >> > > > >>>>> :beam-sdks-python-container:docker/ >> > > > >>>>> and started the JobServer: /./gradlew >> > > > >>>>> >> > :beam-runners-flink_2.11-job-server:runShadow >> > > > >>>>> -PflinkMasterUrl=localhost:8081./ >> > > > >>>>> >> > > > >>>>> I run my pipeline with: /env/bin/python >> > > streaming_pipeline.py >> > > > >>>>> --runner=PortableRunner >> > --job_endpoint=localhost:8099 >> > > > --output xxx >> > > > >>>>> --input_subscription xxx >> > --output_subscription xxx/ >> > > > >>>>> / >> > > > >>>>> / >> > > > >>>>> All this is running inside a Ubuntu >> > (Bionic) in a >> > > Virtualbox. >> > > > >>>>> >> > > > >>>>> The job submits fine, but >> > unfortunately fails after >> > > a few >> > > > seconds with >> > > > >>>>> the error attached. >> > > > >>>>> >> > > > >>>>> Anything I am missing or doing wrong? >> > > > >>>>> >> > > > >>>>> Many thanks. >> > > > >>>>> Best, >> > > > >>>>> Matthias >> > > > >>>>> >> > > > >>>>> >> > > > >> > > >> >
