Are you suggesting something akin to a generic urn: JsonConfiguredJavaSource payload: some json specifying which source and which parameters
which would expand to actually constructing and applying that source? (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi protos that fully specified it, and we use that same format to translate back out.) On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <m...@apache.org> wrote: > Recaping here: > > We all agree that SDF is the way to go for future implementations of > sources. It enables us to get rid of the source interfaces. However, SDF > does not solve the lack of streaming sources in Python. > > The expansion PR (thanks btw!) solves the problem of > expanding/translating URNs known to an ExpansionService. That is a more > programmatic way of replacing language-specific performs, instead of > relying on translators directly in the Runner. > > What is unsolved is the configuration of sources from a foreign > environment. In my opinion this is the most pressing issue for Python > sources, because what is PubSubIO worth in Python if you cannot > configure it? > > What about this: > > I think it is worth adding a JSON configuration option for all existing > Java sources. That way, we could easily configure them as part of the > expansion request (which would contain a JSON configuration). I'll > probably fork a thread to discuss this in more detail, but would like to > hear your thoughts. > > -Max > > On 01.02.19 13:08, Robert Bradshaw wrote: > > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > Ah, I thought you meant native Flink transforms. > > > > Exactly! The translation code is already there. The main challenge > > is how to > > programmatically configure the BeamIO from Python. I suppose that is > > also an > > unsolved problem for cross-language transforms in general. > > > > > > This is what https://github.com/apache/beam/pull/7316 does. > > > > For a particular source, one would want to define a URN and > > corresponding payload, then (probably) a CompositeTransform in Python > > that takes the users arguments, packages them into the payload, applies > > the ExternalTransform, and returns the results. How to handle arbitrary > > UDFs embedded in sources is still TBD. > > > > For Matthias' pipeline with PubSubIO we can build something > > specific, but for > > the general case there should be way to initialize a Beam IO via a > > configuration > > map provided by an external environment. > > > > > > I thought quite a bit about how we could represent expansions statically > > (e.g. have some kind of expansion template that could be used, at least > > in many cases, as data without firing up a separate process. May be > > worth doing eventually, but we run into the same issues that were > > discussed at > > https://github.com/apache/beam/pull/7316#discussion_r249996455 ). > > > > If one is already using a portable runner like Flink, having the job > > service process automatically also serve up an expansion service for > > various URNs it knows and cares about is probably a pretty low bar. > > Flink could serve up things it would rather get back untouched in a > > transform with a special flink runner urn. > > > > As Ahmet mentions, SDF is better solution. I hope it's not that far > > away, but even once it comes we'll likely want the above framework to > > invoke the full suite of Java IOs even after they're running on SDF > > themselves. > > > > - Robert > > > > On 31.01.19 17:36, Thomas Weise wrote: > > > Exactly, that's what I had in mind. > > > > > > A Flink runner native transform would make the existing unbounded > > sources > > > available, similar to: > > > > > > > > > https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167 > > > > > > > > > > > > > > > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org> > > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > > > > Wouldn't it be even more useful for the transition period if > > we enabled Beam IO > > > to be used via Flink (like in the legacy Flink Runner)? In > > this particular > > > example, Matthias wants to use PubSubIO, which is not even > > available as a > > > native > > > Flink transform. > > > > > > On 31.01.19 16:21, Thomas Weise wrote: > > > > Until SDF is supported, we could also add Flink runner > > native transforms for > > > > selected unbounded sources [1]. > > > > > > > > That might be a reasonable option to unblock users that > > want to try Python > > > > streaming on Flink. > > > > > > > > Thomas > > > > > > > > [1] > > > > > > > > > > https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java > > > > > > > > > > > > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org> > > > <mailto:m...@apache.org <mailto:m...@apache.org>> > > > > <mailto:m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: > > > > > > > > > I have a hard time to imagine how can we map in a > > generic way > > > > RestrictionTrackers into the existing > > Bounded/UnboundedSource, so I would > > > > love to hear more about the details. > > > > > > > > Isn't it the other way around? The SDF is a > > generalization of > > > UnboundedSource. > > > > So we would wrap UnboundedSource using SDF. I'm not > > saying it is > > > trivial, but > > > > SDF offers all the functionality that UnboundedSource > > needs. > > > > > > > > For example, the @GetInitialRestriction method would > > call split on the > > > > UnboundedSource and the restriction trackers would > > then be used to > > > process the > > > > splits. > > > > > > > > On 31.01.19 15:16, Ismaël Mejía wrote: > > > > >> Not necessarily. This would be one way. Another > > way is build an SDF > > > > wrapper for UnboundedSource. Probably the easier path > > for migration. > > > > > > > > > > That would be fantastic, I have heard about such > > wrapper multiple > > > > > times but so far there is not any realistic > > proposal. I have a hard > > > > > time to imagine how can we map in a generic way > > RestrictionTrackers > > > > > into the existing Bounded/UnboundedSource, so I > > would love to hear > > > > > more about the details. > > > > > > > > > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org> > > > <mailto:m...@apache.org <mailto:m...@apache.org>> > > > > <mailto:m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: > > > > >> > > > > >> > In addition to have support in the runners, > > this will require a > > > > >> > rewrite of PubsubIO to use the new SDF API. > > > > >> > > > > >> Not necessarily. This would be one way. Another > > way is build an SDF > > > > wrapper for > > > > >> UnboundedSource. Probably the easier path for > > migration. > > > > >> > > > > >> On 31.01.19 14:03, Ismaël Mejía wrote: > > > > >>>> Fortunately, there is already a pending PR for > > cross-language > > > > pipelines which > > > > >>>> will allow us to use Java IO like PubSub in > > Python jobs. > > > > >>> > > > > >>> In addition to have support in the runners, this > > will require a > > > > >>> rewrite of PubsubIO to use the new SDF API. > > > > >>> > > > > >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian > Michels > > > <m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>> > > > > <mailto:m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: > > > > >>>> > > > > >>>> Hi Matthias, > > > > >>>> > > > > >>>> This is already reflected in the compatibility > > matrix, if you look > > > > under SDF. > > > > >>>> There is no UnboundedSource interface for > > portable pipelines. > > > That's a > > > > legacy > > > > >>>> abstraction that will be replaced with SDF. > > > > >>>> > > > > >>>> Fortunately, there is already a pending PR for > > cross-language > > > > pipelines which > > > > >>>> will allow us to use Java IO like PubSub in > > Python jobs. > > > > >>>> > > > > >>>> Thanks, > > > > >>>> Max > > > > >>>> > > > > >>>> On 31.01.19 12:06, Matthias Baetens wrote: > > > > >>>>> Hey Ankur, > > > > >>>>> > > > > >>>>> Thanks for the swift reply. Should I change > > this in the > > > capability matrix > > > > >>>>> > > <https://s.apache.org/apache-beam-portability-support-table> then? > > > > >>>>> > > > > >>>>> Many thanks. > > > > >>>>> Best, > > > > >>>>> Matthias > > > > >>>>> > > > > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka > > <goe...@google.com <mailto:goe...@google.com> > > > <mailto:goe...@google.com <mailto:goe...@google.com>> > > > > <mailto:goe...@google.com <mailto:goe...@google.com> > > <mailto:goe...@google.com <mailto:goe...@google.com>>> > > > > >>>>> <mailto:goe...@google.com > > <mailto:goe...@google.com> <mailto:goe...@google.com > > <mailto:goe...@google.com>> > > > <mailto:goe...@google.com <mailto:goe...@google.com> > > <mailto:goe...@google.com <mailto:goe...@google.com>>>>> wrote: > > > > >>>>> > > > > >>>>> Hi Matthias, > > > > >>>>> > > > > >>>>> Unfortunately, unbounded reads including > > pubsub are not yet > > > > supported for > > > > >>>>> portable runners. > > > > >>>>> > > > > >>>>> Thanks, > > > > >>>>> Ankur > > > > >>>>> > > > > >>>>> On Thu, Jan 31, 2019 at 2:44 PM Matthias > > Baetens > > > > <baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com>> > > > <mailto:baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com>>> > > > > >>>>> <mailto:baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com> > > > <mailto:baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com>> > > > > <mailto:baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com> > > > <mailto:baetensmatth...@gmail.com > > <mailto:baetensmatth...@gmail.com>>>>> wrote: > > > > >>>>> > > > > >>>>> Hi everyone, > > > > >>>>> > > > > >>>>> Last few days I have been trying to > > run a streaming > > > > pipeline (code on > > > > >>>>> Github > > <https://github.com/matthiasa4/beam-demo>) on a > > > > Flink Runner. > > > > >>>>> > > > > >>>>> I am running a Flink cluster locally > > (v1.5.6 > > > > >>>>> > > <https://flink.apache.org/downloads.html>) > > > > >>>>> I have built the SDK Harness > > Container: /./gradlew > > > > >>>>> :beam-sdks-python-container:docker/ > > > > >>>>> and started the JobServer: /./gradlew > > > > >>>>> > > :beam-runners-flink_2.11-job-server:runShadow > > > > >>>>> -PflinkMasterUrl=localhost:8081./ > > > > >>>>> > > > > >>>>> I run my pipeline with: > /env/bin/python > > > streaming_pipeline.py > > > > >>>>> --runner=PortableRunner > > --job_endpoint=localhost:8099 > > > > --output xxx > > > > >>>>> --input_subscription xxx > > --output_subscription xxx/ > > > > >>>>> / > > > > >>>>> / > > > > >>>>> All this is running inside a Ubuntu > > (Bionic) in a > > > Virtualbox. > > > > >>>>> > > > > >>>>> The job submits fine, but > > unfortunately fails after > > > a few > > > > seconds with > > > > >>>>> the error attached. > > > > >>>>> > > > > >>>>> Anything I am missing or doing wrong? > > > > >>>>> > > > > >>>>> Many thanks. > > > > >>>>> Best, > > > > >>>>> Matthias > > > > >>>>> > > > > >>>>> > > > > > > > > > >