Max thanks for your summary. I would like to add that we agree that
the runner specific translation via URN is a temporal solution until
the wrappers transforms are written, is this correct? In any case this
alternative standard expansion approach deserves a discussion of their
own as you mention.

Correct. Wrapping existing Beam transforms should always be preferred over Runner-specific translation because the latter is not portable.

On 01.02.19 14:25, Ismaël Mejía wrote:
Thanks for the explanation Robert it makes much more sense now. (Sorry
for the confusion in the mapping I mistyped the direction SDF <->
Source).

Status of SDF:
- Support for Dynamic Work Rebalancing is WIP.
- Bounded version translation is supported by all non-portable runners
in a relatively naive way.
- Unbounded version translation is not supported in the non-portable
runners. (Let's not forget that this case may make sense too).
- Portable runners translation of SDF is WIP
- There is only one IO that is written based on SDF:
   - HBaseIO
- Some other IOs should work out of the box (those based on
non-splittable DoFn):
   - ClickhouseIO
   - File-based ones: TextIO, AvroIO, ParquetIO
   - JdbcIO
   - SolrIO

Max thanks for your summary. I would like to add that we agree that
the runner specific translation via URN is a temporal solution until
the wrappers transforms are written, is this correct? In any case this
alternative standard expansion approach deserves a discussion of their
own as you mention.

On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw <rober...@google.com> wrote:

Are you suggesting something akin to a generic

     urn: JsonConfiguredJavaSource
     payload: some json specifying which source and which parameters

which would expand to actually constructing and applying that source?

(FWIW, I was imagining PubSubIO already had a translation into BeamFnApi protos 
that fully specified it, and we use that same format to translate back out.)

On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <m...@apache.org> wrote:

Recaping here:

We all agree that SDF is the way to go for future implementations of
sources. It enables us to get rid of the source interfaces. However, SDF
does not solve the lack of streaming sources in Python.

The expansion PR (thanks btw!) solves the problem of
expanding/translating URNs known to an ExpansionService. That is a more
programmatic way of replacing language-specific performs, instead of
relying on translators directly in the Runner.

What is unsolved is the configuration of sources from a foreign
environment. In my opinion this is the most pressing issue for Python
sources, because what is PubSubIO worth in Python if you cannot
configure it?

What about this:

I think it is worth adding a JSON configuration option for all existing
Java sources. That way, we could easily configure them as part of the
expansion request (which would contain a JSON configuration). I'll
probably fork a thread to discuss this in more detail, but would like to
hear your thoughts.

-Max

On 01.02.19 13:08, Robert Bradshaw wrote:
On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:

     Ah, I thought you meant native Flink transforms.

     Exactly! The translation code is already there. The main challenge
     is how to
     programmatically configure the BeamIO from Python. I suppose that is
     also an
     unsolved problem for cross-language transforms in general.


This is what https://github.com/apache/beam/pull/7316 does.

For a particular source, one would want to define a URN and
corresponding payload, then (probably) a CompositeTransform in Python
that takes the users arguments, packages them into the payload, applies
the ExternalTransform, and returns the results. How to handle arbitrary
UDFs embedded in sources is still TBD.

     For Matthias' pipeline with PubSubIO we can build something
     specific, but for
     the general case there should be way to initialize a Beam IO via a
     configuration
     map provided by an external environment.


I thought quite a bit about how we could represent expansions statically
(e.g. have some kind of expansion template that could be used, at least
in many cases, as data without firing up a separate process. May be
worth doing eventually, but we run into the same issues that were
discussed at
https://github.com/apache/beam/pull/7316#discussion_r249996455 ).

If one is already using a portable runner like Flink, having the job
service process automatically also serve up an expansion service for
various URNs it knows and cares about is probably a pretty low bar.
Flink could serve up things it would rather get back untouched in a
transform with a special flink runner urn.

As Ahmet mentions, SDF is better solution. I hope it's not that far
away, but even once it comes we'll likely want the above framework to
invoke the full suite of Java IOs even after they're running on SDF
themselves.

- Robert

     On 31.01.19 17:36, Thomas Weise wrote:
      > Exactly, that's what I had in mind.
      >
      > A Flink runner native transform would make the existing unbounded
     sources
      > available, similar to:
      >
      >
     
https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
      >
      >
      >
      >
      > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
     <m...@apache.org <mailto:m...@apache.org>
      > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
      >
      >     Wouldn't it be even more useful for the transition period if
     we enabled Beam IO
      >     to be used via Flink (like in the legacy Flink Runner)? In
     this particular
      >     example, Matthias wants to use PubSubIO, which is not even
     available as a
      >     native
      >     Flink transform.
      >
      >     On 31.01.19 16:21, Thomas Weise wrote:
      >      > Until SDF is supported, we could also add Flink runner
     native transforms for
      >      > selected unbounded sources [1].
      >      >
      >      > That might be a reasonable option to unblock users that
     want to try Python
      >      > streaming on Flink.
      >      >
      >      > Thomas
      >      >
      >      > [1]
      >      >
      >
     
https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
      >      >
      >      >
      >      > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
     <m...@apache.org <mailto:m...@apache.org>
      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
      >      > <mailto:m...@apache.org <mailto:m...@apache.org>
     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
      >      >
      >      >      > I have a hard time to imagine how can we map in a
     generic way
      >      >     RestrictionTrackers into the existing
     Bounded/UnboundedSource, so I would
      >      >     love to hear more about the details.
      >      >
      >      >     Isn't it the other way around? The SDF is a
     generalization of
      >     UnboundedSource.
      >      >     So we would wrap UnboundedSource using SDF. I'm not
     saying it is
      >     trivial, but
      >      >     SDF offers all the functionality that UnboundedSource
     needs.
      >      >
      >      >     For example, the @GetInitialRestriction method would
     call split on the
      >      >     UnboundedSource and the restriction trackers would
     then be used to
      >     process the
      >      >     splits.
      >      >
      >      >     On 31.01.19 15:16, Ismaël Mejía wrote:
      >      >      >> Not necessarily. This would be one way. Another
     way is build an SDF
      >      >     wrapper for UnboundedSource. Probably the easier path
     for migration.
      >      >      >
      >      >      > That would be fantastic, I have heard about such
     wrapper multiple
      >      >      > times but so far there is not any realistic
     proposal. I have a hard
      >      >      > time to imagine how can we map in a generic way
     RestrictionTrackers
      >      >      > into the existing Bounded/UnboundedSource, so I
     would love to hear
      >      >      > more about the details.
      >      >      >
      >      >      > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels
     <m...@apache.org <mailto:m...@apache.org>
      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
      >      >      >>
      >      >      >>   > In addition to have support in the runners,
     this will require a
      >      >      >>   > rewrite of PubsubIO to use the new SDF API.
      >      >      >>
      >      >      >> Not necessarily. This would be one way. Another
     way is build an SDF
      >      >     wrapper for
      >      >      >> UnboundedSource. Probably the easier path for
     migration.
      >      >      >>
      >      >      >> On 31.01.19 14:03, Ismaël Mejía wrote:
      >      >      >>>> Fortunately, there is already a pending PR for
     cross-language
      >      >     pipelines which
      >      >      >>>> will allow us to use Java IO like PubSub in
     Python jobs.
      >      >      >>>
      >      >      >>> In addition to have support in the runners, this
     will require a
      >      >      >>> rewrite of PubsubIO to use the new SDF API.
      >      >      >>>
      >      >      >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels
      >     <m...@apache.org <mailto:m...@apache.org>
     <mailto:m...@apache.org <mailto:m...@apache.org>>
      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
      >      >      >>>>
      >      >      >>>> Hi Matthias,
      >      >      >>>>
      >      >      >>>> This is already reflected in the compatibility
     matrix, if you look
      >      >     under SDF.
      >      >      >>>> There is no UnboundedSource interface for
     portable pipelines.
      >     That's a
      >      >     legacy
      >      >      >>>> abstraction that will be replaced with SDF.
      >      >      >>>>
      >      >      >>>> Fortunately, there is already a pending PR for
     cross-language
      >      >     pipelines which
      >      >      >>>> will allow us to use Java IO like PubSub in
     Python jobs.
      >      >      >>>>
      >      >      >>>> Thanks,
      >      >      >>>> Max
      >      >      >>>>
      >      >      >>>> On 31.01.19 12:06, Matthias Baetens wrote:
      >      >      >>>>> Hey Ankur,
      >      >      >>>>>
      >      >      >>>>> Thanks for the swift reply. Should I change
     this in the
      >     capability matrix
      >      >      >>>>>
     <https://s.apache.org/apache-beam-portability-support-table> then?
      >      >      >>>>>
      >      >      >>>>> Many thanks.
      >      >      >>>>> Best,
      >      >      >>>>> Matthias
      >      >      >>>>>
      >      >      >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka
     <goe...@google.com <mailto:goe...@google.com>
      >     <mailto:goe...@google.com <mailto:goe...@google.com>>
      >      >     <mailto:goe...@google.com <mailto:goe...@google.com>
     <mailto:goe...@google.com <mailto:goe...@google.com>>>
      >      >      >>>>> <mailto:goe...@google.com
     <mailto:goe...@google.com> <mailto:goe...@google.com
     <mailto:goe...@google.com>>
      >     <mailto:goe...@google.com <mailto:goe...@google.com>
     <mailto:goe...@google.com <mailto:goe...@google.com>>>>> wrote:
      >      >      >>>>>
      >      >      >>>>>       Hi Matthias,
      >      >      >>>>>
      >      >      >>>>>       Unfortunately, unbounded reads including
     pubsub are not yet
      >      >     supported for
      >      >      >>>>>       portable runners.
      >      >      >>>>>
      >      >      >>>>>       Thanks,
      >      >      >>>>>       Ankur
      >      >      >>>>>
      >      >      >>>>>       On Thu, Jan 31, 2019 at 2:44 PM Matthias
     Baetens
      >      >     <baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com>>
      >     <mailto:baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com> <mailto:baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com>>>
      >      >      >>>>>       <mailto:baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com>
      >     <mailto:baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com>>
      >      >     <mailto:baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com>
      >     <mailto:baetensmatth...@gmail.com
     <mailto:baetensmatth...@gmail.com>>>>> wrote:
      >      >      >>>>>
      >      >      >>>>>           Hi everyone,
      >      >      >>>>>
      >      >      >>>>>           Last few days I have been trying to
     run a streaming
      >      >     pipeline (code on
      >      >      >>>>>           Github
     <https://github.com/matthiasa4/beam-demo>) on a
      >      >     Flink Runner.
      >      >      >>>>>
      >      >      >>>>>           I am running a Flink cluster locally
     (v1.5.6
      >      >      >>>>>
       <https://flink.apache.org/downloads.html>)
      >      >      >>>>>           I have built the SDK Harness
     Container: /./gradlew
      >      >      >>>>>           :beam-sdks-python-container:docker/
      >      >      >>>>>           and started the JobServer: /./gradlew
      >      >      >>>>>
       :beam-runners-flink_2.11-job-server:runShadow
      >      >      >>>>>           -PflinkMasterUrl=localhost:8081./
      >      >      >>>>>
      >      >      >>>>>           I run my pipeline with: /env/bin/python
      >     streaming_pipeline.py
      >      >      >>>>>           --runner=PortableRunner
     --job_endpoint=localhost:8099
      >      >     --output xxx
      >      >      >>>>>           --input_subscription xxx
     --output_subscription xxx/
      >      >      >>>>>           /
      >      >      >>>>>           /
      >      >      >>>>>           All this is running inside a Ubuntu
     (Bionic) in a
      >     Virtualbox.
      >      >      >>>>>
      >      >      >>>>>           The job submits fine, but
     unfortunately fails after
      >     a few
      >      >     seconds with
      >      >      >>>>>           the error attached.
      >      >      >>>>>
      >      >      >>>>>           Anything I am missing or doing wrong?
      >      >      >>>>>
      >      >      >>>>>           Many thanks.
      >      >      >>>>>           Best,
      >      >      >>>>>           Matthias
      >      >      >>>>>
      >      >      >>>>>
      >      >
      >

Reply via email to