On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Ah, I thought you meant native Flink transforms.
Exactly! The translation code is already there. The main challenge
is how to
programmatically configure the BeamIO from Python. I suppose that is
also an
unsolved problem for cross-language transforms in general.
This is what https://github.com/apache/beam/pull/7316 does.
For a particular source, one would want to define a URN and
corresponding payload, then (probably) a CompositeTransform in Python
that takes the users arguments, packages them into the payload, applies
the ExternalTransform, and returns the results. How to handle arbitrary
UDFs embedded in sources is still TBD.
For Matthias' pipeline with PubSubIO we can build something
specific, but for
the general case there should be way to initialize a Beam IO via a
configuration
map provided by an external environment.
I thought quite a bit about how we could represent expansions statically
(e.g. have some kind of expansion template that could be used, at least
in many cases, as data without firing up a separate process. May be
worth doing eventually, but we run into the same issues that were
discussed at
https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
If one is already using a portable runner like Flink, having the job
service process automatically also serve up an expansion service for
various URNs it knows and cares about is probably a pretty low bar.
Flink could serve up things it would rather get back untouched in a
transform with a special flink runner urn.
As Ahmet mentions, SDF is better solution. I hope it's not that far
away, but even once it comes we'll likely want the above framework to
invoke the full suite of Java IOs even after they're running on SDF
themselves.
- Robert
On 31.01.19 17:36, Thomas Weise wrote:
> Exactly, that's what I had in mind.
>
> A Flink runner native transform would make the existing unbounded
sources
> available, similar to:
>
>
https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
>
>
>
>
> On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> Wouldn't it be even more useful for the transition period if
we enabled Beam IO
> to be used via Flink (like in the legacy Flink Runner)? In
this particular
> example, Matthias wants to use PubSubIO, which is not even
available as a
> native
> Flink transform.
>
> On 31.01.19 16:21, Thomas Weise wrote:
> > Until SDF is supported, we could also add Flink runner
native transforms for
> > selected unbounded sources [1].
> >
> > That might be a reasonable option to unblock users that
want to try Python
> > streaming on Flink.
> >
> > Thomas
> >
> > [1]
> >
>
https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
> >
> >
> > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >
> > > I have a hard time to imagine how can we map in a
generic way
> > RestrictionTrackers into the existing
Bounded/UnboundedSource, so I would
> > love to hear more about the details.
> >
> > Isn't it the other way around? The SDF is a
generalization of
> UnboundedSource.
> > So we would wrap UnboundedSource using SDF. I'm not
saying it is
> trivial, but
> > SDF offers all the functionality that UnboundedSource
needs.
> >
> > For example, the @GetInitialRestriction method would
call split on the
> > UnboundedSource and the restriction trackers would
then be used to
> process the
> > splits.
> >
> > On 31.01.19 15:16, Ismaël Mejía wrote:
> > >> Not necessarily. This would be one way. Another
way is build an SDF
> > wrapper for UnboundedSource. Probably the easier path
for migration.
> > >
> > > That would be fantastic, I have heard about such
wrapper multiple
> > > times but so far there is not any realistic
proposal. I have a hard
> > > time to imagine how can we map in a generic way
RestrictionTrackers
> > > into the existing Bounded/UnboundedSource, so I
would love to hear
> > > more about the details.
> > >
> > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> > >>
> > >> > In addition to have support in the runners,
this will require a
> > >> > rewrite of PubsubIO to use the new SDF API.
> > >>
> > >> Not necessarily. This would be one way. Another
way is build an SDF
> > wrapper for
> > >> UnboundedSource. Probably the easier path for
migration.
> > >>
> > >> On 31.01.19 14:03, Ismaël Mejía wrote:
> > >>>> Fortunately, there is already a pending PR for
cross-language
> > pipelines which
> > >>>> will allow us to use Java IO like PubSub in
Python jobs.
> > >>>
> > >>> In addition to have support in the runners, this
will require a
> > >>> rewrite of PubsubIO to use the new SDF API.
> > >>>
> > >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels
> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> > >>>>
> > >>>> Hi Matthias,
> > >>>>
> > >>>> This is already reflected in the compatibility
matrix, if you look
> > under SDF.
> > >>>> There is no UnboundedSource interface for
portable pipelines.
> That's a
> > legacy
> > >>>> abstraction that will be replaced with SDF.
> > >>>>
> > >>>> Fortunately, there is already a pending PR for
cross-language
> > pipelines which
> > >>>> will allow us to use Java IO like PubSub in
Python jobs.
> > >>>>
> > >>>> Thanks,
> > >>>> Max
> > >>>>
> > >>>> On 31.01.19 12:06, Matthias Baetens wrote:
> > >>>>> Hey Ankur,
> > >>>>>
> > >>>>> Thanks for the swift reply. Should I change
this in the
> capability matrix
> > >>>>>
<https://s.apache.org/apache-beam-portability-support-table> then?
> > >>>>>
> > >>>>> Many thanks.
> > >>>>> Best,
> > >>>>> Matthias
> > >>>>>
> > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka
<goe...@google.com <mailto:goe...@google.com>
> <mailto:goe...@google.com <mailto:goe...@google.com>>
> > <mailto:goe...@google.com <mailto:goe...@google.com>
<mailto:goe...@google.com <mailto:goe...@google.com>>>
> > >>>>> <mailto:goe...@google.com
<mailto:goe...@google.com> <mailto:goe...@google.com
<mailto:goe...@google.com>>
> <mailto:goe...@google.com <mailto:goe...@google.com>
<mailto:goe...@google.com <mailto:goe...@google.com>>>>> wrote:
> > >>>>>
> > >>>>> Hi Matthias,
> > >>>>>
> > >>>>> Unfortunately, unbounded reads including
pubsub are not yet
> > supported for
> > >>>>> portable runners.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Ankur
> > >>>>>
> > >>>>> On Thu, Jan 31, 2019 at 2:44 PM Matthias
Baetens
> > <baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>>
> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>>>
> > >>>>> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>
> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>>
> > <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>
> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>>>>> wrote:
> > >>>>>
> > >>>>> Hi everyone,
> > >>>>>
> > >>>>> Last few days I have been trying to
run a streaming
> > pipeline (code on
> > >>>>> Github
<https://github.com/matthiasa4/beam-demo>) on a
> > Flink Runner.
> > >>>>>
> > >>>>> I am running a Flink cluster locally
(v1.5.6
> > >>>>>
<https://flink.apache.org/downloads.html>)
> > >>>>> I have built the SDK Harness
Container: /./gradlew
> > >>>>> :beam-sdks-python-container:docker/
> > >>>>> and started the JobServer: /./gradlew
> > >>>>>
:beam-runners-flink_2.11-job-server:runShadow
> > >>>>> -PflinkMasterUrl=localhost:8081./
> > >>>>>
> > >>>>> I run my pipeline with: /env/bin/python
> streaming_pipeline.py
> > >>>>> --runner=PortableRunner
--job_endpoint=localhost:8099
> > --output xxx
> > >>>>> --input_subscription xxx
--output_subscription xxx/
> > >>>>> /
> > >>>>> /
> > >>>>> All this is running inside a Ubuntu
(Bionic) in a
> Virtualbox.
> > >>>>>
> > >>>>> The job submits fine, but
unfortunately fails after
> a few
> > seconds with
> > >>>>> the error attached.
> > >>>>>
> > >>>>> Anything I am missing or doing wrong?
> > >>>>>
> > >>>>> Many thanks.
> > >>>>> Best,
> > >>>>> Matthias
> > >>>>>
> > >>>>>
> >
>