Are you suggesting something akin to a generic
urn: JsonConfiguredJavaSource
payload: some json specifying which source and which parameters
which would expand to actually constructing and applying that source?
(FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
protos that fully specified it, and we use that same format to translate
back out.)
On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <[email protected]> wrote:
> Recaping here:
>
> We all agree that SDF is the way to go for future implementations of
> sources. It enables us to get rid of the source interfaces. However, SDF
> does not solve the lack of streaming sources in Python.
>
> The expansion PR (thanks btw!) solves the problem of
> expanding/translating URNs known to an ExpansionService. That is a more
> programmatic way of replacing language-specific performs, instead of
> relying on translators directly in the Runner.
>
> What is unsolved is the configuration of sources from a foreign
> environment. In my opinion this is the most pressing issue for Python
> sources, because what is PubSubIO worth in Python if you cannot
> configure it?
>
> What about this:
>
> I think it is worth adding a JSON configuration option for all existing
> Java sources. That way, we could easily configure them as part of the
> expansion request (which would contain a JSON configuration). I'll
> probably fork a thread to discuss this in more detail, but would like to
> hear your thoughts.
>
> -Max
>
> On 01.02.19 13:08, Robert Bradshaw wrote:
> > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > Ah, I thought you meant native Flink transforms.
> >
> > Exactly! The translation code is already there. The main challenge
> > is how to
> > programmatically configure the BeamIO from Python. I suppose that is
> > also an
> > unsolved problem for cross-language transforms in general.
> >
> >
> > This is what https://github.com/apache/beam/pull/7316 does.
> >
> > For a particular source, one would want to define a URN and
> > corresponding payload, then (probably) a CompositeTransform in Python
> > that takes the users arguments, packages them into the payload, applies
> > the ExternalTransform, and returns the results. How to handle arbitrary
> > UDFs embedded in sources is still TBD.
> >
> > For Matthias' pipeline with PubSubIO we can build something
> > specific, but for
> > the general case there should be way to initialize a Beam IO via a
> > configuration
> > map provided by an external environment.
> >
> >
> > I thought quite a bit about how we could represent expansions statically
> > (e.g. have some kind of expansion template that could be used, at least
> > in many cases, as data without firing up a separate process. May be
> > worth doing eventually, but we run into the same issues that were
> > discussed at
> > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
> >
> > If one is already using a portable runner like Flink, having the job
> > service process automatically also serve up an expansion service for
> > various URNs it knows and cares about is probably a pretty low bar.
> > Flink could serve up things it would rather get back untouched in a
> > transform with a special flink runner urn.
> >
> > As Ahmet mentions, SDF is better solution. I hope it's not that far
> > away, but even once it comes we'll likely want the above framework to
> > invoke the full suite of Java IOs even after they're running on SDF
> > themselves.
> >
> > - Robert
> >
> > On 31.01.19 17:36, Thomas Weise wrote:
> > > Exactly, that's what I had in mind.
> > >
> > > A Flink runner native transform would make the existing unbounded
> > sources
> > > available, similar to:
> > >
> > >
> >
> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
> > >
> > >
> > >
> > >
> > > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > >
> > > Wouldn't it be even more useful for the transition period if
> > we enabled Beam IO
> > > to be used via Flink (like in the legacy Flink Runner)? In
> > this particular
> > > example, Matthias wants to use PubSubIO, which is not even
> > available as a
> > > native
> > > Flink transform.
> > >
> > > On 31.01.19 16:21, Thomas Weise wrote:
> > > > Until SDF is supported, we could also add Flink runner
> > native transforms for
> > > > selected unbounded sources [1].
> > > >
> > > > That might be a reasonable option to unblock users that
> > want to try Python
> > > > streaming on Flink.
> > > >
> > > > Thomas
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
> > > >
> > > >
> > > > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > > >
> > > > > I have a hard time to imagine how can we map in a
> > generic way
> > > > RestrictionTrackers into the existing
> > Bounded/UnboundedSource, so I would
> > > > love to hear more about the details.
> > > >
> > > > Isn't it the other way around? The SDF is a
> > generalization of
> > > UnboundedSource.
> > > > So we would wrap UnboundedSource using SDF. I'm not
> > saying it is
> > > trivial, but
> > > > SDF offers all the functionality that UnboundedSource
> > needs.
> > > >
> > > > For example, the @GetInitialRestriction method would
> > call split on the
> > > > UnboundedSource and the restriction trackers would
> > then be used to
> > > process the
> > > > splits.
> > > >
> > > > On 31.01.19 15:16, Ismaël Mejía wrote:
> > > > >> Not necessarily. This would be one way. Another
> > way is build an SDF
> > > > wrapper for UnboundedSource. Probably the easier path
> > for migration.
> > > > >
> > > > > That would be fantastic, I have heard about such
> > wrapper multiple
> > > > > times but so far there is not any realistic
> > proposal. I have a hard
> > > > > time to imagine how can we map in a generic way
> > RestrictionTrackers
> > > > > into the existing Bounded/UnboundedSource, so I
> > would love to hear
> > > > > more about the details.
> > > > >
> > > > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > > > >>
> > > > >> > In addition to have support in the runners,
> > this will require a
> > > > >> > rewrite of PubsubIO to use the new SDF API.
> > > > >>
> > > > >> Not necessarily. This would be one way. Another
> > way is build an SDF
> > > > wrapper for
> > > > >> UnboundedSource. Probably the easier path for
> > migration.
> > > > >>
> > > > >> On 31.01.19 14:03, Ismaël Mejía wrote:
> > > > >>>> Fortunately, there is already a pending PR for
> > cross-language
> > > > pipelines which
> > > > >>>> will allow us to use Java IO like PubSub in
> > Python jobs.
> > > > >>>
> > > > >>> In addition to have support in the runners, this
> > will require a
> > > > >>> rewrite of PubsubIO to use the new SDF API.
> > > > >>>
> > > > >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian
> Michels
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > > > >>>>
> > > > >>>> Hi Matthias,
> > > > >>>>
> > > > >>>> This is already reflected in the compatibility
> > matrix, if you look
> > > > under SDF.
> > > > >>>> There is no UnboundedSource interface for
> > portable pipelines.
> > > That's a
> > > > legacy
> > > > >>>> abstraction that will be replaced with SDF.
> > > > >>>>
> > > > >>>> Fortunately, there is already a pending PR for
> > cross-language
> > > > pipelines which
> > > > >>>> will allow us to use Java IO like PubSub in
> > Python jobs.
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>> Max
> > > > >>>>
> > > > >>>> On 31.01.19 12:06, Matthias Baetens wrote:
> > > > >>>>> Hey Ankur,
> > > > >>>>>
> > > > >>>>> Thanks for the swift reply. Should I change
> > this in the
> > > capability matrix
> > > > >>>>>
> > <https://s.apache.org/apache-beam-portability-support-table> then?
> > > > >>>>>
> > > > >>>>> Many thanks.
> > > > >>>>> Best,
> > > > >>>>> Matthias
> > > > >>>>>
> > > > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>
> > > > >>>>> <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > > > >>>>>
> > > > >>>>> Hi Matthias,
> > > > >>>>>
> > > > >>>>> Unfortunately, unbounded reads including
> > pubsub are not yet
> > > > supported for
> > > > >>>>> portable runners.
> > > > >>>>>
> > > > >>>>> Thanks,
> > > > >>>>> Ankur
> > > > >>>>>
> > > > >>>>> On Thu, Jan 31, 2019 at 2:44 PM Matthias
> > Baetens
> > > > <[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>
> > > <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>>
> > > > >>>>> <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>
> > > > <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>>>> wrote:
> > > > >>>>>
> > > > >>>>> Hi everyone,
> > > > >>>>>
> > > > >>>>> Last few days I have been trying to
> > run a streaming
> > > > pipeline (code on
> > > > >>>>> Github
> > <https://github.com/matthiasa4/beam-demo>) on a
> > > > Flink Runner.
> > > > >>>>>
> > > > >>>>> I am running a Flink cluster locally
> > (v1.5.6
> > > > >>>>>
> > <https://flink.apache.org/downloads.html>)
> > > > >>>>> I have built the SDK Harness
> > Container: /./gradlew
> > > > >>>>> :beam-sdks-python-container:docker/
> > > > >>>>> and started the JobServer: /./gradlew
> > > > >>>>>
> > :beam-runners-flink_2.11-job-server:runShadow
> > > > >>>>> -PflinkMasterUrl=localhost:8081./
> > > > >>>>>
> > > > >>>>> I run my pipeline with:
> /env/bin/python
> > > streaming_pipeline.py
> > > > >>>>> --runner=PortableRunner
> > --job_endpoint=localhost:8099
> > > > --output xxx
> > > > >>>>> --input_subscription xxx
> > --output_subscription xxx/
> > > > >>>>> /
> > > > >>>>> /
> > > > >>>>> All this is running inside a Ubuntu
> > (Bionic) in a
> > > Virtualbox.
> > > > >>>>>
> > > > >>>>> The job submits fine, but
> > unfortunately fails after
> > > a few
> > > > seconds with
> > > > >>>>> the error attached.
> > > > >>>>>
> > > > >>>>> Anything I am missing or doing wrong?
> > > > >>>>>
> > > > >>>>> Many thanks.
> > > > >>>>> Best,
> > > > >>>>> Matthias
> > > > >>>>>
> > > > >>>>>
> > > >
> > >
> >
>