Until SDF is supported, we could also add Flink runner native transforms for selected unbounded sources [1].
That might be a reasonable option to unblock users that want to try Python streaming on Flink. Thomas [1] https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels <m...@apache.org> wrote: > > I have a hard time to imagine how can we map in a generic way > RestrictionTrackers into the existing Bounded/UnboundedSource, so I would > love to hear more about the details. > > Isn't it the other way around? The SDF is a generalization of > UnboundedSource. > So we would wrap UnboundedSource using SDF. I'm not saying it is trivial, > but > SDF offers all the functionality that UnboundedSource needs. > > For example, the @GetInitialRestriction method would call split on the > UnboundedSource and the restriction trackers would then be used to process > the > splits. > > On 31.01.19 15:16, Ismaël Mejía wrote: > >> Not necessarily. This would be one way. Another way is build an SDF > wrapper for UnboundedSource. Probably the easier path for migration. > > > > That would be fantastic, I have heard about such wrapper multiple > > times but so far there is not any realistic proposal. I have a hard > > time to imagine how can we map in a generic way RestrictionTrackers > > into the existing Bounded/UnboundedSource, so I would love to hear > > more about the details. > > > > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels <m...@apache.org> > wrote: > >> > >> > In addition to have support in the runners, this will require a > >> > rewrite of PubsubIO to use the new SDF API. > >> > >> Not necessarily. This would be one way. Another way is build an SDF > wrapper for > >> UnboundedSource. Probably the easier path for migration. > >> > >> On 31.01.19 14:03, Ismaël Mejía wrote: > >>>> Fortunately, there is already a pending PR for cross-language > pipelines which > >>>> will allow us to use Java IO like PubSub in Python jobs. > >>> > >>> In addition to have support in the runners, this will require a > >>> rewrite of PubsubIO to use the new SDF API. > >>> > >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels <m...@apache.org> > wrote: > >>>> > >>>> Hi Matthias, > >>>> > >>>> This is already reflected in the compatibility matrix, if you look > under SDF. > >>>> There is no UnboundedSource interface for portable pipelines. That's > a legacy > >>>> abstraction that will be replaced with SDF. > >>>> > >>>> Fortunately, there is already a pending PR for cross-language > pipelines which > >>>> will allow us to use Java IO like PubSub in Python jobs. > >>>> > >>>> Thanks, > >>>> Max > >>>> > >>>> On 31.01.19 12:06, Matthias Baetens wrote: > >>>>> Hey Ankur, > >>>>> > >>>>> Thanks for the swift reply. Should I change this in the capability > matrix > >>>>> <https://s.apache.org/apache-beam-portability-support-table> then? > >>>>> > >>>>> Many thanks. > >>>>> Best, > >>>>> Matthias > >>>>> > >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka <goe...@google.com > >>>>> <mailto:goe...@google.com>> wrote: > >>>>> > >>>>> Hi Matthias, > >>>>> > >>>>> Unfortunately, unbounded reads including pubsub are not yet > supported for > >>>>> portable runners. > >>>>> > >>>>> Thanks, > >>>>> Ankur > >>>>> > >>>>> On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens < > baetensmatth...@gmail.com > >>>>> <mailto:baetensmatth...@gmail.com>> wrote: > >>>>> > >>>>> Hi everyone, > >>>>> > >>>>> Last few days I have been trying to run a streaming > pipeline (code on > >>>>> Github <https://github.com/matthiasa4/beam-demo>) on a > Flink Runner. > >>>>> > >>>>> I am running a Flink cluster locally (v1.5.6 > >>>>> <https://flink.apache.org/downloads.html>) > >>>>> I have built the SDK Harness Container: /./gradlew > >>>>> :beam-sdks-python-container:docker/ > >>>>> and started the JobServer: /./gradlew > >>>>> :beam-runners-flink_2.11-job-server:runShadow > >>>>> -PflinkMasterUrl=localhost:8081./ > >>>>> > >>>>> I run my pipeline with: /env/bin/python > streaming_pipeline.py > >>>>> --runner=PortableRunner --job_endpoint=localhost:8099 > --output xxx > >>>>> --input_subscription xxx --output_subscription xxx/ > >>>>> / > >>>>> / > >>>>> All this is running inside a Ubuntu (Bionic) in a > Virtualbox. > >>>>> > >>>>> The job submits fine, but unfortunately fails after a few > seconds with > >>>>> the error attached. > >>>>> > >>>>> Anything I am missing or doing wrong? > >>>>> > >>>>> Many thanks. > >>>>> Best, > >>>>> Matthias > >>>>> > >>>>> >