On Fri, Feb 1, 2019 at 5:42 PM Thomas Weise <t...@apache.org> wrote:

>
> On Fri, Feb 1, 2019 at 6:17 AM Maximilian Michels <m...@apache.org> wrote:
>
>> > Max thanks for your summary. I would like to add that we agree that
>> > the runner specific translation via URN is a temporal solution until
>> > the wrappers transforms are written, is this correct? In any case this
>> > alternative standard expansion approach deserves a discussion of their
>> > own as you mention.
>>
>> Correct. Wrapping existing Beam transforms should always be preferred
>> over Runner-specific translation because the latter is not portable.
>>
>>
> From a Python user perspective, this can still be exposed as a stub,
> without having to know about the URN.
>

Yep. In the long run, I'd expect many sources to be offered as their own
easy-to-use stubs.


> Also, isn't how we expose this is orthogonal to how it is being translated?
>

Yes.


> It may even be possible to switch the stub to SDF based translation once
> that is ready.
>

Yep. The expansion would change, but that's all an internal detail iside
the composite the user doesn't care about.


>
>
>> On 01.02.19 14:25, Ismaël Mejía wrote:
>> > Thanks for the explanation Robert it makes much more sense now. (Sorry
>> > for the confusion in the mapping I mistyped the direction SDF <->
>> > Source).
>> >
>> > Status of SDF:
>> > - Support for Dynamic Work Rebalancing is WIP.
>> > - Bounded version translation is supported by all non-portable runners
>> > in a relatively naive way.
>> > - Unbounded version translation is not supported in the non-portable
>> > runners. (Let's not forget that this case may make sense too).
>> > - Portable runners translation of SDF is WIP
>> > - There is only one IO that is written based on SDF:
>> >    - HBaseIO
>> > - Some other IOs should work out of the box (those based on
>> > non-splittable DoFn):
>> >    - ClickhouseIO
>> >    - File-based ones: TextIO, AvroIO, ParquetIO
>> >    - JdbcIO
>> >    - SolrIO
>> >
>> > Max thanks for your summary. I would like to add that we agree that
>> > the runner specific translation via URN is a temporal solution until
>> > the wrappers transforms are written, is this correct? In any case this
>> > alternative standard expansion approach deserves a discussion of their
>> > own as you mention.
>> >
>> > On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>> >>
>> >> Are you suggesting something akin to a generic
>> >>
>> >>      urn: JsonConfiguredJavaSource
>> >>      payload: some json specifying which source and which parameters
>> >>
>> >> which would expand to actually constructing and applying that source?
>> >>
>> >> (FWIW, I was imagining PubSubIO already had a translation into
>> BeamFnApi protos that fully specified it, and we use that same format to
>> translate back out.)
>> >>
>> >> On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels <m...@apache.org>
>> wrote:
>> >>>
>> >>> Recaping here:
>> >>>
>> >>> We all agree that SDF is the way to go for future implementations of
>> >>> sources. It enables us to get rid of the source interfaces. However,
>> SDF
>> >>> does not solve the lack of streaming sources in Python.
>> >>>
>> >>> The expansion PR (thanks btw!) solves the problem of
>> >>> expanding/translating URNs known to an ExpansionService. That is a
>> more
>> >>> programmatic way of replacing language-specific performs, instead of
>> >>> relying on translators directly in the Runner.
>> >>>
>> >>> What is unsolved is the configuration of sources from a foreign
>> >>> environment. In my opinion this is the most pressing issue for Python
>> >>> sources, because what is PubSubIO worth in Python if you cannot
>> >>> configure it?
>> >>>
>> >>> What about this:
>> >>>
>> >>> I think it is worth adding a JSON configuration option for all
>> existing
>> >>> Java sources. That way, we could easily configure them as part of the
>> >>> expansion request (which would contain a JSON configuration). I'll
>> >>> probably fork a thread to discuss this in more detail, but would like
>> to
>> >>> hear your thoughts.
>> >>>
>> >>> -Max
>> >>>
>> >>> On 01.02.19 13:08, Robert Bradshaw wrote:
>> >>>> On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels <m...@apache.org
>> >>>> <mailto:m...@apache.org>> wrote:
>> >>>>
>> >>>>      Ah, I thought you meant native Flink transforms.
>> >>>>
>> >>>>      Exactly! The translation code is already there. The main
>> challenge
>> >>>>      is how to
>> >>>>      programmatically configure the BeamIO from Python. I suppose
>> that is
>> >>>>      also an
>> >>>>      unsolved problem for cross-language transforms in general.
>> >>>>
>> >>>>
>> >>>> This is what https://github.com/apache/beam/pull/7316 does.
>> >>>>
>> >>>> For a particular source, one would want to define a URN and
>> >>>> corresponding payload, then (probably) a CompositeTransform in Python
>> >>>> that takes the users arguments, packages them into the payload,
>> applies
>> >>>> the ExternalTransform, and returns the results. How to handle
>> arbitrary
>> >>>> UDFs embedded in sources is still TBD.
>> >>>>
>> >>>>      For Matthias' pipeline with PubSubIO we can build something
>> >>>>      specific, but for
>> >>>>      the general case there should be way to initialize a Beam IO
>> via a
>> >>>>      configuration
>> >>>>      map provided by an external environment.
>> >>>>
>> >>>>
>> >>>> I thought quite a bit about how we could represent expansions
>> statically
>> >>>> (e.g. have some kind of expansion template that could be used, at
>> least
>> >>>> in many cases, as data without firing up a separate process. May be
>> >>>> worth doing eventually, but we run into the same issues that were
>> >>>> discussed at
>> >>>> https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
>> >>>>
>> >>>> If one is already using a portable runner like Flink, having the job
>> >>>> service process automatically also serve up an expansion service for
>> >>>> various URNs it knows and cares about is probably a pretty low bar.
>> >>>> Flink could serve up things it would rather get back untouched in a
>> >>>> transform with a special flink runner urn.
>> >>>>
>> >>>> As Ahmet mentions, SDF is better solution. I hope it's not that far
>> >>>> away, but even once it comes we'll likely want the above framework to
>> >>>> invoke the full suite of Java IOs even after they're running on SDF
>> >>>> themselves.
>> >>>>
>> >>>> - Robert
>> >>>>
>> >>>>      On 31.01.19 17:36, Thomas Weise wrote:
>> >>>>       > Exactly, that's what I had in mind.
>> >>>>       >
>> >>>>       > A Flink runner native transform would make the existing
>> unbounded
>> >>>>      sources
>> >>>>       > available, similar to:
>> >>>>       >
>> >>>>       >
>> >>>>
>> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
>> >>>>       >
>> >>>>       >
>> >>>>       >
>> >>>>       >
>> >>>>       > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
>> >>>>      <m...@apache.org <mailto:m...@apache.org>
>> >>>>       > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>> >>>>       >
>> >>>>       >     Wouldn't it be even more useful for the transition
>> period if
>> >>>>      we enabled Beam IO
>> >>>>       >     to be used via Flink (like in the legacy Flink Runner)?
>> In
>> >>>>      this particular
>> >>>>       >     example, Matthias wants to use PubSubIO, which is not
>> even
>> >>>>      available as a
>> >>>>       >     native
>> >>>>       >     Flink transform.
>> >>>>       >
>> >>>>       >     On 31.01.19 16:21, Thomas Weise wrote:
>> >>>>       >      > Until SDF is supported, we could also add Flink runner
>> >>>>      native transforms for
>> >>>>       >      > selected unbounded sources [1].
>> >>>>       >      >
>> >>>>       >      > That might be a reasonable option to unblock users
>> that
>> >>>>      want to try Python
>> >>>>       >      > streaming on Flink.
>> >>>>       >      >
>> >>>>       >      > Thomas
>> >>>>       >      >
>> >>>>       >      > [1]
>> >>>>       >      >
>> >>>>       >
>> >>>>
>> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
>> >>>>       >      >
>> >>>>       >      >
>> >>>>       >      > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
>> >>>>      <m...@apache.org <mailto:m...@apache.org>
>> >>>>       >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >>>>       >      > <mailto:m...@apache.org <mailto:m...@apache.org>
>> >>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>> >>>>       >      >
>> >>>>       >      >      > I have a hard time to imagine how can we map
>> in a
>> >>>>      generic way
>> >>>>       >      >     RestrictionTrackers into the existing
>> >>>>      Bounded/UnboundedSource, so I would
>> >>>>       >      >     love to hear more about the details.
>> >>>>       >      >
>> >>>>       >      >     Isn't it the other way around? The SDF is a
>> >>>>      generalization of
>> >>>>       >     UnboundedSource.
>> >>>>       >      >     So we would wrap UnboundedSource using SDF. I'm
>> not
>> >>>>      saying it is
>> >>>>       >     trivial, but
>> >>>>       >      >     SDF offers all the functionality that
>> UnboundedSource
>> >>>>      needs.
>> >>>>       >      >
>> >>>>       >      >     For example, the @GetInitialRestriction method
>> would
>> >>>>      call split on the
>> >>>>       >      >     UnboundedSource and the restriction trackers would
>> >>>>      then be used to
>> >>>>       >     process the
>> >>>>       >      >     splits.
>> >>>>       >      >
>> >>>>       >      >     On 31.01.19 15:16, Ismaël Mejía wrote:
>> >>>>       >      >      >> Not necessarily. This would be one way.
>> Another
>> >>>>      way is build an SDF
>> >>>>       >      >     wrapper for UnboundedSource. Probably the easier
>> path
>> >>>>      for migration.
>> >>>>       >      >      >
>> >>>>       >      >      > That would be fantastic, I have heard about
>> such
>> >>>>      wrapper multiple
>> >>>>       >      >      > times but so far there is not any realistic
>> >>>>      proposal. I have a hard
>> >>>>       >      >      > time to imagine how can we map in a generic way
>> >>>>      RestrictionTrackers
>> >>>>       >      >      > into the existing Bounded/UnboundedSource, so I
>> >>>>      would love to hear
>> >>>>       >      >      > more about the details.
>> >>>>       >      >      >
>> >>>>       >      >      > On Thu, Jan 31, 2019 at 3:07 PM Maximilian
>> Michels
>> >>>>      <m...@apache.org <mailto:m...@apache.org>
>> >>>>       >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >>>>       >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>> >>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>> >>>>       >      >      >>
>> >>>>       >      >      >>   > In addition to have support in the
>> runners,
>> >>>>      this will require a
>> >>>>       >      >      >>   > rewrite of PubsubIO to use the new SDF
>> API.
>> >>>>       >      >      >>
>> >>>>       >      >      >> Not necessarily. This would be one way.
>> Another
>> >>>>      way is build an SDF
>> >>>>       >      >     wrapper for
>> >>>>       >      >      >> UnboundedSource. Probably the easier path for
>> >>>>      migration.
>> >>>>       >      >      >>
>> >>>>       >      >      >> On 31.01.19 14:03, Ismaël Mejía wrote:
>> >>>>       >      >      >>>> Fortunately, there is already a pending PR
>> for
>> >>>>      cross-language
>> >>>>       >      >     pipelines which
>> >>>>       >      >      >>>> will allow us to use Java IO like PubSub in
>> >>>>      Python jobs.
>> >>>>       >      >      >>>
>> >>>>       >      >      >>> In addition to have support in the runners,
>> this
>> >>>>      will require a
>> >>>>       >      >      >>> rewrite of PubsubIO to use the new SDF API.
>> >>>>       >      >      >>>
>> >>>>       >      >      >>> On Thu, Jan 31, 2019 at 12:23 PM Maximilian
>> Michels
>> >>>>       >     <m...@apache.org <mailto:m...@apache.org>
>> >>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >>>>       >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>> >>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>> >>>>       >      >      >>>>
>> >>>>       >      >      >>>> Hi Matthias,
>> >>>>       >      >      >>>>
>> >>>>       >      >      >>>> This is already reflected in the
>> compatibility
>> >>>>      matrix, if you look
>> >>>>       >      >     under SDF.
>> >>>>       >      >      >>>> There is no UnboundedSource interface for
>> >>>>      portable pipelines.
>> >>>>       >     That's a
>> >>>>       >      >     legacy
>> >>>>       >      >      >>>> abstraction that will be replaced with SDF.
>> >>>>       >      >      >>>>
>> >>>>       >      >      >>>> Fortunately, there is already a pending PR
>> for
>> >>>>      cross-language
>> >>>>       >      >     pipelines which
>> >>>>       >      >      >>>> will allow us to use Java IO like PubSub in
>> >>>>      Python jobs.
>> >>>>       >      >      >>>>
>> >>>>       >      >      >>>> Thanks,
>> >>>>       >      >      >>>> Max
>> >>>>       >      >      >>>>
>> >>>>       >      >      >>>> On 31.01.19 12:06, Matthias Baetens wrote:
>> >>>>       >      >      >>>>> Hey Ankur,
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>> Thanks for the swift reply. Should I change
>> >>>>      this in the
>> >>>>       >     capability matrix
>> >>>>       >      >      >>>>>
>> >>>>      <https://s.apache.org/apache-beam-portability-support-table>
>> then?
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>> Many thanks.
>> >>>>       >      >      >>>>> Best,
>> >>>>       >      >      >>>>> Matthias
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka
>> >>>>      <goe...@google.com <mailto:goe...@google.com>
>> >>>>       >     <mailto:goe...@google.com <mailto:goe...@google.com>>
>> >>>>       >      >     <mailto:goe...@google.com <mailto:
>> goe...@google.com>
>> >>>>      <mailto:goe...@google.com <mailto:goe...@google.com>>>
>> >>>>       >      >      >>>>> <mailto:goe...@google.com
>> >>>>      <mailto:goe...@google.com> <mailto:goe...@google.com
>> >>>>      <mailto:goe...@google.com>>
>> >>>>       >     <mailto:goe...@google.com <mailto:goe...@google.com>
>> >>>>      <mailto:goe...@google.com <mailto:goe...@google.com>>>>> wrote:
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>       Hi Matthias,
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>       Unfortunately, unbounded reads
>> including
>> >>>>      pubsub are not yet
>> >>>>       >      >     supported for
>> >>>>       >      >      >>>>>       portable runners.
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>       Thanks,
>> >>>>       >      >      >>>>>       Ankur
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>       On Thu, Jan 31, 2019 at 2:44 PM
>> Matthias
>> >>>>      Baetens
>> >>>>       >      >     <baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com> <mailto:
>> baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com>>
>> >>>>       >     <mailto:baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com> <mailto:
>> baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com>>>
>> >>>>       >      >      >>>>>       <mailto:baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com>
>> >>>>       >     <mailto:baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com>>
>> >>>>       >      >     <mailto:baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com>
>> >>>>       >     <mailto:baetensmatth...@gmail.com
>> >>>>      <mailto:baetensmatth...@gmail.com>>>>> wrote:
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>           Hi everyone,
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>           Last few days I have been trying
>> to
>> >>>>      run a streaming
>> >>>>       >      >     pipeline (code on
>> >>>>       >      >      >>>>>           Github
>> >>>>      <https://github.com/matthiasa4/beam-demo>) on a
>> >>>>       >      >     Flink Runner.
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>           I am running a Flink cluster
>> locally
>> >>>>      (v1.5.6
>> >>>>       >      >      >>>>>
>> >>>>        <https://flink.apache.org/downloads.html>)
>> >>>>       >      >      >>>>>           I have built the SDK Harness
>> >>>>      Container: /./gradlew
>> >>>>       >      >      >>>>>
>>  :beam-sdks-python-container:docker/
>> >>>>       >      >      >>>>>           and started the JobServer:
>> /./gradlew
>> >>>>       >      >      >>>>>
>> >>>>        :beam-runners-flink_2.11-job-server:runShadow
>> >>>>       >      >      >>>>>           -PflinkMasterUrl=localhost:8081./
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>           I run my pipeline with:
>> /env/bin/python
>> >>>>       >     streaming_pipeline.py
>> >>>>       >      >      >>>>>           --runner=PortableRunner
>> >>>>      --job_endpoint=localhost:8099
>> >>>>       >      >     --output xxx
>> >>>>       >      >      >>>>>           --input_subscription xxx
>> >>>>      --output_subscription xxx/
>> >>>>       >      >      >>>>>           /
>> >>>>       >      >      >>>>>           /
>> >>>>       >      >      >>>>>           All this is running inside a
>> Ubuntu
>> >>>>      (Bionic) in a
>> >>>>       >     Virtualbox.
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>           The job submits fine, but
>> >>>>      unfortunately fails after
>> >>>>       >     a few
>> >>>>       >      >     seconds with
>> >>>>       >      >      >>>>>           the error attached.
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>           Anything I am missing or doing
>> wrong?
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>           Many thanks.
>> >>>>       >      >      >>>>>           Best,
>> >>>>       >      >      >>>>>           Matthias
>> >>>>       >      >      >>>>>
>> >>>>       >      >      >>>>>
>> >>>>       >      >
>> >>>>       >
>> >>>>
>>
>

Reply via email to