Thanks Lukasz. The following two links were somehow incorrectly formatted
in your mail.

* How to process a bundle:
https://s.apache.org/beam-fn-api-processing-a-bundle
* How to send and receive data:
https://s.apache.org/beam-fn-api-send-and-receive-data

By the way, is there a way to find them from the Beam website ?


On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <[email protected]>
wrote:

> Now that I'm back from vacation and the 2.0.0 release is not taking all my
> time, I am focusing my attention on working on the Beam Portability
> framework, specifically the Fn API so that we can get Python and other
> language integrations work with any runner.
>
> For new comers, I would like to reshare the overview:
> https://s.apache.org/beam-fn-api
>
> And for those of you who have been following this thread and contributors
> focusing on Runner integration with Apache Beam:
> * How to process a bundle: https://s.apache.org/beam-fn-api-processing-a-
> bundle
> * How to send and receive data: https://s.apache.org/
> beam-fn-api-send-and-receive-data
>
> If you want to dive deeper, you should look at:
> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> common/runner-api/src/main/proto/beam_runner_api.proto
> <https://github.com/apache/beam/blob/master/sdks/common/runner-api/src/main/proto/beam_runner_api.proto>
> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> common/fn-api/src/main/proto/beam_fn_api.proto
> <https://github.com/apache/beam/blob/master/sdks/common/fn-api/src/main/proto/beam_fn_api.proto>
> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> java/harness
> <https://github.com/apache/beam/tree/master/sdks/java/harness>
> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> python/apache_beam/runners/worker
> <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/worker>
>
> Next I'm planning on talking about Beam Fn State API and will need help
> from Runner contributors to talk about caching semantics and key spaces and
> whether the integrations mesh well with current Runner implementations. The
> State API is meant to support user state, side inputs, and re-iteration for
> large values produced by GroupByKey.
>
> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <[email protected]> wrote:
>
> > Yes, I was using a Pipeline that was:
> > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a batch
> > pipeline in the global window using the default trigger)
> >
> > In Google Cloud Dataflow, the shuffle step uses the binary representation
> > to compare keys, so the above pipeline would normally be converted to the
> > following two stages:
> > Read -> GBK Writer
> > GBK Reader -> IdentityParDo
> >
> > Note that the GBK Writer and GBK Reader need to use a coder to encode and
> > decode the value.
> >
> > When using the Fn API, those two stages expanded because of the Fn Api
> > crossings using a gRPC Write/Read pair:
> > Read -> gRPC Write -> gRPC Read -> GBK Writer
> > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
> >
> > In my naive prototype implementation, the coder was used to encode
> > elements at the gRPC steps. This meant that the coder was
> > encoding/decoding/encoding in the first stage and
> > decoding/encoding/decoding in the second stage. This tripled the amount
> of
> > times the coder was being invoked per element. This additional use of the
> > coder accounted for ~12% (80% of the 15%) of the extra execution time.
> This
> > implementation is quite inefficient and would benefit from merging the
> gRPC
> > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
> into
> > another actor allowing for the creation of a fast path that can skip
> parts
> > of the decode/encode cycle through the coder. By using a byte array view
> > over the logical stream, one can minimize the number of byte array copies
> > which plagued my naive implementation. This can be done by only parsing
> the
> > element boundaries out of the stream to produce those logical byte array
> > views. I have a very rough estimate that performing this optimization
> would
> > reduce the 12% overhead to somewhere between 4% and 6%.
> >
> > The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
> > marshalling/unmarshalling protos
> > handling/managing the socket
> > flow control
> > ...
> >
> > Finally, I did try experiments with different buffer sizes (10KB, 100KB,
> > 1000KB), flow control (separate thread[1] vs same thread with phaser[2]),
> > and channel type [3] (NIO, epoll, domain socket), but coder overhead
> easily
> > dominated the differences in these other experiments.
> >
> > Further analysis would need to be done to more accurately distill this
> > down.
> >
> > 1: https://github.com/lukecwik/incubator-beam/blob/
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> > BufferingStreamObserver.java
> > 2: https://github.com/lukecwik/incubator-beam/blob/
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> > DirectStreamObserver.java
> > 3: https://github.com/lukecwik/incubator-beam/blob/
> >
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/
> > ManagedChannelFactory.java
> >
> >
> > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <[email protected]> wrote:
> >
> >> Awesome job Lukasz, Excellent, I have to confess the first time I heard
> >> about
> >> the Fn API idea I was a bit incredulous, but you are making it real,
> >> amazing!
> >>
> >> Just one question from your document, you said that 80% of the extra
> (15%)
> >> time
> >> goes into encoding and decoding the data for your test case, can you
> >> expand
> >> in
> >> your current ideas to improve this? (I am not sure I completely
> understand
> >> the
> >> issue).
> >>
> >>
> >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <[email protected]>
> >> wrote:
> >>
> >> > Responded inline.
> >> >
> >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <[email protected]>
> >> wrote:
> >> >
> >> > > This is truly amazing Luke!
> >> > >
> >> > > If I understand this right, the runner executing the DoFn will
> >> delegate
> >> > the
> >> > > function code and input data (and state, coders, etc.) to the
> >> container
> >> > > where it will execute with the user's SDK of choice, right ?
> >> >
> >> >
> >> > Yes, that is correct.
> >> >
> >> >
> >> > > I wonder how the containers relate to the underlying engine's worker
> >> > > processes ? is it a 1-1, container per worker ? if there's less
> "work"
> >> > for
> >> > > the worker's Java process (for example) now and it becomes a sort of
> >> > > "dispatcher", would that change the resource allocation commonly
> used
> >> for
> >> > > the same Pipeline so that the worker processes would require less
> >> > > resources, while giving those to the container ?
> >> > >
> >> >
> >> > I think with the four services (control, data, state, logging) you can
> >> go
> >> > with a 1-1 relationship or break it up more finely grained and
> dedicate
> >> > some machines to have specific tasks. Like you could have a few
> machines
> >> > dedicated to log aggregation which all the workers push their logs to.
> >> > Similarly, you could have some machines that have a lot of memory
> which
> >> > would be better to be able to do shuffles in memory and then this
> >> cluster
> >> > of high memory machines could front the data service. I believe there
> >> is a
> >> > lot of flexibility based upon what a runner can do and what it
> >> specializes
> >> > in and believe that with more effort comes more possibilities albeit
> >> with
> >> > increased internal complexity.
> >> >
> >> > The layout of resources depends on whether the services and SDK
> >> containers
> >> > are co-hosted on the same machine or whether there is a different
> >> > architecture in play. In a co-hosted configuration, it seems likely
> that
> >> > the SDK container will get more resources but is dependent on the
> runner
> >> > and pipeline shape (shuffle heavy dominated pipelines will look
> >> different
> >> > then ParDo dominated pipelines).
> >> >
> >> >
> >> > > About executing sub-graphs, would it be true to say that as long as
> >> > there's
> >> > > no shuffle, you could keep executing in the same container ? meaning
> >> that
> >> > > the graph is broken into sub-graphs by shuffles ?
> >> > >
> >> >
> >> > The only thing that is required is that the Apache Beam model is
> >> preserved
> >> > so typical break points will be at shuffles and language crossing
> points
> >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
> >> graph
> >> > even more for other reasons.
> >> >
> >> >
> >> > > I have to dig-in deeper, so I could have more questions ;-) thanks
> >> Luke!
> >> > >
> >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
> <[email protected]
> >> >
> >> > > wrote:
> >> > >
> >> > > > I updated the PR description to contain the same.
> >> > > >
> >> > > > I would start by looking at the API/object model definitions found
> >> in
> >> > > > beam_fn_api.proto
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> >> > > > >
> >> > > >
> >> > > > Then depending on your interest, look at the following:
> >> > > > * FnHarness.java
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
> >> > harness/FnHarness.java
> >> > > > >
> >> > > > is the main entry point.
> >> > > > * org.apache.beam.fn.harness.data
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> >> > > > >
> >> > > > contains the most interesting bits of code since it is able to
> >> > multiplex
> >> > > a
> >> > > > gRPC stream into multiple logical streams of elements bound for
> >> > multiple
> >> > > > concurrent process bundle requests. It also contains the code to
> >> take
> >> > > > multiple logical outbound streams and multiplex them back onto a
> >> gRPC
> >> > > > stream.
> >> > > > * org.apache.beam.runners.core
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
> >> > > > >
> >> > > > contains additional runners akin to the DoFnRunner found in
> >> > runners-core
> >> > > to
> >> > > > support sources and gRPC endpoints.
> >> > > >
> >> > > > Unless your really interested in how domain sockets, epoll, nio
> >> channel
> >> > > > factories or how stream readiness callbacks work in gRPC, I would
> >> avoid
> >> > > the
> >> > > > packages org.apache.beam.fn.harness.channel and
> >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
> >> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake
> >> as
> >> > > they
> >> > > > don't add anything meaningful to the api.
> >> > > >
> >> > > > Code package descriptions:
> >> > > >
> >> > > > org.apache.beam.fn.harness.FnHarness: main entry point
> >> > > > org.apache.beam.fn.harness.control: Control service client and
> >> > > individual
> >> > > > request handlers
> >> > > > org.apache.beam.fn.harness.data: Data service client and logical
> >> > stream
> >> > > > multiplexing
> >> > > > org.apache.beam.runners.core: Additional runners akin to the
> >> DoFnRunner
> >> > > > found in runners-core to support sources and gRPC endpoints
> >> > > > org.apache.beam.fn.harness.logging: Logging client implementation
> >> and
> >> > > JUL
> >> > > > logging handler adapter
> >> > > > org.apache.beam.fn.harness.channel: gRPC channel management
> >> > > > org.apache.beam.fn.harness.stream: gRPC stream management
> >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
> >> extensions
> >> > > >
> >> > > >
> >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
> >> > <[email protected]
> >> > > >
> >> > > > wrote:
> >> > > >
> >> > > > > This is awesome! Any chance you could roadmap the PR for us with
> >> some
> >> > > > links
> >> > > > > into the most interesting bits?
> >> > > > >
> >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> >> > > > > [email protected]> wrote:
> >> > > > >
> >> > > > > > Also, note that we can still support the "simple" case. For
> >> > example,
> >> > > > > > if the user supplies us with a jar file (as they do now) a
> >> runner
> >> > > > > > could launch it as a subprocesses and communicate with it via
> >> this
> >> > > > > > same Fn API or install it in a fixed container itself--the
> user
> >> > > > > > doesn't *need* to know about docker or manually manage
> >> containers
> >> > > (and
> >> > > > > > indeed the Fn API could be used in-process, cross-process,
> >> > > > > > cross-container, and even cross-machine).
> >> > > > > >
> >> > > > > > However docker provides a nice cross-language way of
> specifying
> >> the
> >> > > > > > environment including all dependencies (especially for
> languages
> >> > like
> >> > > > > > Python or C where the equivalent of a cross-platform,
> >> > self-contained
> >> > > > > > jar isn't as easy to produce) and is strictly more powerful
> and
> >> > > > > > flexible (specifically it isolates the runtime environment and
> >> one
> >> > > can
> >> > > > > > even use it for local testing).
> >> > > > > >
> >> > > > > > Slicing a worker up like this without sacrificing performance
> >> is an
> >> > > > > > ambitious goal, but essential to the story of being able to
> mix
> >> and
> >> > > > > > match runners and SDKs arbitrarily, and I think this is a
> great
> >> > > start.
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> >> > > <[email protected]
> >> > > > >
> >> > > > > > wrote:
> >> > > > > > > Your correct, a docker container is created that contains
> the
> >> > > > execution
> >> > > > > > > environment the user wants or the user re-uses an existing
> one
> >> > > > > (allowing
> >> > > > > > > for a user to embed all their code/dependencies or use a
> >> > container
> >> > > > that
> >> > > > > > can
> >> > > > > > > deploy code/dependencies on demand).
> >> > > > > > > A user creates a pipeline saying which docker container they
> >> want
> >> > > to
> >> > > > > use
> >> > > > > > > (this starts to allow for multiple container definitions
> >> within a
> >> > > > > single
> >> > > > > > > pipeline to support multiple languages, versioning, ...).
> >> > > > > > > A runner would then be responsible for launching one or more
> >> of
> >> > > these
> >> > > > > > > containers in a cluster manager of their choice (scaling up
> or
> >> > down
> >> > > > the
> >> > > > > > > number of instances depending on demand/load/...).
> >> > > > > > > A runner then interacts with the docker containers over the
> >> gRPC
> >> > > > > service
> >> > > > > > > definitions to delegate processing to.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> >> > > > [email protected]
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hi Luke,
> >> > > > > > >>
> >> > > > > > >> that's really great and very promising !
> >> > > > > > >>
> >> > > > > > >> It's really ambitious but I like the idea. Just to clarify:
> >> the
> >> > > > > purpose
> >> > > > > > of
> >> > > > > > >> using gRPC is once the docker container is running, then we
> >> can
> >> > > > > > "interact"
> >> > > > > > >> with the container to spread and delegate processing to the
> >> > docker
> >> > > > > > >> container, correct ?
> >> > > > > > >> The users/devops have to setup the docker containers as
> >> > > > prerequisite.
> >> > > > > > >> Then, the "location" of the containers (kind of container
> >> > > registry)
> >> > > > is
> >> > > > > > set
> >> > > > > > >> via the pipeline options and used by gRPC ?
> >> > > > > > >>
> >> > > > > > >> Thanks Luke !
> >> > > > > > >>
> >> > > > > > >> Regards
> >> > > > > > >> JB
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> >> > > > > > >>
> >> > > > > > >>> I have been prototyping several components towards the
> Beam
> >> > > > technical
> >> > > > > > >>> vision of being able to execute an arbitrary language
> using
> >> an
> >> > > > > > arbitrary
> >> > > > > > >>> runner.
> >> > > > > > >>>
> >> > > > > > >>> I would like to share this overview [1] of what I have
> been
> >> > > working
> >> > > > > > >>> towards. I also share this PR [2] with a proposed API,
> >> service
> >> > > > > > definitions
> >> > > > > > >>> and partial implementation.
> >> > > > > > >>>
> >> > > > > > >>> 1: https://s.apache.org/beam-fn-api
> >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
> >> > > > > > >>>
> >> > > > > > >>> Please comment on the overview within this thread, and any
> >> > > specific
> >> > > > > > code
> >> > > > > > >>> comments on the PR directly.
> >> > > > > > >>>
> >> > > > > > >>> Luke
> >> > > > > > >>>
> >> > > > > > >>>
> >> > > > > > >> --
> >> > > > > > >> Jean-Baptiste Onofré
> >> > > > > > >> [email protected]
> >> > > > > > >> http://blog.nanthrax.net
> >> > > > > > >> Talend - http://www.talend.com
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to