Ah, I thought you meant native Flink transforms.
Exactly! The translation code is already there. The main challenge is how to
programmatically configure the BeamIO from Python. I suppose that is also an
unsolved problem for cross-language transforms in general.
For Matthias' pipeline with PubSubIO we can build something specific, but for
the general case there should be way to initialize a Beam IO via a configuration
map provided by an external environment.
On 31.01.19 17:36, Thomas Weise wrote:
Exactly, that's what I had in mind.
A Flink runner native transform would make the existing unbounded sources
available, similar to:
https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Wouldn't it be even more useful for the transition period if we enabled
Beam IO
to be used via Flink (like in the legacy Flink Runner)? In this particular
example, Matthias wants to use PubSubIO, which is not even available as a
native
Flink transform.
On 31.01.19 16:21, Thomas Weise wrote:
> Until SDF is supported, we could also add Flink runner native transforms
for
> selected unbounded sources [1].
>
> That might be a reasonable option to unblock users that want to try
Python
> streaming on Flink.
>
> Thomas
>
> [1]
>
https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
>
>
> On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> > I have a hard time to imagine how can we map in a generic way
> RestrictionTrackers into the existing Bounded/UnboundedSource, so I
would
> love to hear more about the details.
>
> Isn't it the other way around? The SDF is a generalization of
UnboundedSource.
> So we would wrap UnboundedSource using SDF. I'm not saying it is
trivial, but
> SDF offers all the functionality that UnboundedSource needs.
>
> For example, the @GetInitialRestriction method would call split on
the
> UnboundedSource and the restriction trackers would then be used to
process the
> splits.
>
> On 31.01.19 15:16, Ismaël Mejía wrote:
> >> Not necessarily. This would be one way. Another way is build an
SDF
> wrapper for UnboundedSource. Probably the easier path for migration.
> >
> > That would be fantastic, I have heard about such wrapper multiple
> > times but so far there is not any realistic proposal. I have a
hard
> > time to imagine how can we map in a generic way
RestrictionTrackers
> > into the existing Bounded/UnboundedSource, so I would love to hear
> > more about the details.
> >
> > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels
<m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >>
> >> > In addition to have support in the runners, this will
require a
> >> > rewrite of PubsubIO to use the new SDF API.
> >>
> >> Not necessarily. This would be one way. Another way is build an
SDF
> wrapper for
> >> UnboundedSource. Probably the easier path for migration.
> >>
> >> On 31.01.19 14:03, Ismaël Mejía wrote:
> >>>> Fortunately, there is already a pending PR for cross-language
> pipelines which
> >>>> will allow us to use Java IO like PubSub in Python jobs.
> >>>
> >>> In addition to have support in the runners, this will require a
> >>> rewrite of PubsubIO to use the new SDF API.
> >>>
> >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >>>>
> >>>> Hi Matthias,
> >>>>
> >>>> This is already reflected in the compatibility matrix, if you
look
> under SDF.
> >>>> There is no UnboundedSource interface for portable pipelines.
That's a
> legacy
> >>>> abstraction that will be replaced with SDF.
> >>>>
> >>>> Fortunately, there is already a pending PR for cross-language
> pipelines which
> >>>> will allow us to use Java IO like PubSub in Python jobs.
> >>>>
> >>>> Thanks,
> >>>> Max
> >>>>
> >>>> On 31.01.19 12:06, Matthias Baetens wrote:
> >>>>> Hey Ankur,
> >>>>>
> >>>>> Thanks for the swift reply. Should I change this in the
capability matrix
> >>>>> <https://s.apache.org/apache-beam-portability-support-table>
then?
> >>>>>
> >>>>> Many thanks.
> >>>>> Best,
> >>>>> Matthias
> >>>>>
> >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka <goe...@google.com
<mailto:goe...@google.com>
> <mailto:goe...@google.com <mailto:goe...@google.com>>
> >>>>> <mailto:goe...@google.com <mailto:goe...@google.com>
<mailto:goe...@google.com <mailto:goe...@google.com>>>> wrote:
> >>>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Unfortunately, unbounded reads including pubsub are not
yet
> supported for
> >>>>> portable runners.
> >>>>>
> >>>>> Thanks,
> >>>>> Ankur
> >>>>>
> >>>>> On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens
> <baetensmatth...@gmail.com <mailto:baetensmatth...@gmail.com>
<mailto:baetensmatth...@gmail.com <mailto:baetensmatth...@gmail.com>>
> >>>>> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>
> <mailto:baetensmatth...@gmail.com
<mailto:baetensmatth...@gmail.com>>>> wrote:
> >>>>>
> >>>>> Hi everyone,
> >>>>>
> >>>>> Last few days I have been trying to run a streaming
> pipeline (code on
> >>>>> Github <https://github.com/matthiasa4/beam-demo>)
on a
> Flink Runner.
> >>>>>
> >>>>> I am running a Flink cluster locally (v1.5.6
> >>>>> <https://flink.apache.org/downloads.html>)
> >>>>> I have built the SDK Harness Container: /./gradlew
> >>>>> :beam-sdks-python-container:docker/
> >>>>> and started the JobServer: /./gradlew
> >>>>> :beam-runners-flink_2.11-job-server:runShadow
> >>>>> -PflinkMasterUrl=localhost:8081./
> >>>>>
> >>>>> I run my pipeline with: /env/bin/python
streaming_pipeline.py
> >>>>> --runner=PortableRunner
--job_endpoint=localhost:8099
> --output xxx
> >>>>> --input_subscription xxx --output_subscription xxx/
> >>>>> /
> >>>>> /
> >>>>> All this is running inside a Ubuntu (Bionic) in a
Virtualbox.
> >>>>>
> >>>>> The job submits fine, but unfortunately fails after
a few
> seconds with
> >>>>> the error attached.
> >>>>>
> >>>>> Anything I am missing or doing wrong?
> >>>>>
> >>>>> Many thanks.
> >>>>> Best,
> >>>>> Matthias
> >>>>>
> >>>>>
>