Thanks Lukasz. The following two links were somehow incorrectly formatted in your mail.
* How to process a bundle: https://s.apache.org/beam-fn-api-processing-a-bundle * How to send and receive data: https://s.apache.org/beam-fn-api-send-and-receive-data By the way, is there a way to find them from the Beam website ? On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <[email protected]> wrote: > Now that I'm back from vacation and the 2.0.0 release is not taking all my > time, I am focusing my attention on working on the Beam Portability > framework, specifically the Fn API so that we can get Python and other > language integrations work with any runner. > > For new comers, I would like to reshare the overview: > https://s.apache.org/beam-fn-api > > And for those of you who have been following this thread and contributors > focusing on Runner integration with Apache Beam: > * How to process a bundle: https://s.apache.org/beam-fn-api-processing-a- > bundle > * How to send and receive data: https://s.apache.org/ > beam-fn-api-send-and-receive-data > > If you want to dive deeper, you should look at: > * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/ > common/runner-api/src/main/proto/beam_runner_api.proto > <https://github.com/apache/beam/blob/master/sdks/common/runner-api/src/main/proto/beam_runner_api.proto> > * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/ > common/fn-api/src/main/proto/beam_fn_api.proto > <https://github.com/apache/beam/blob/master/sdks/common/fn-api/src/main/proto/beam_fn_api.proto> > * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/ > java/harness > <https://github.com/apache/beam/tree/master/sdks/java/harness> > * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/ > python/apache_beam/runners/worker > <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/worker> > > Next I'm planning on talking about Beam Fn State API and will need help > from Runner contributors to talk about caching semantics and key spaces and > whether the integrations mesh well with current Runner implementations. The > State API is meant to support user state, side inputs, and re-iteration for > large values produced by GroupByKey. > > On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <[email protected]> wrote: > > > Yes, I was using a Pipeline that was: > > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a batch > > pipeline in the global window using the default trigger) > > > > In Google Cloud Dataflow, the shuffle step uses the binary representation > > to compare keys, so the above pipeline would normally be converted to the > > following two stages: > > Read -> GBK Writer > > GBK Reader -> IdentityParDo > > > > Note that the GBK Writer and GBK Reader need to use a coder to encode and > > decode the value. > > > > When using the Fn API, those two stages expanded because of the Fn Api > > crossings using a gRPC Write/Read pair: > > Read -> gRPC Write -> gRPC Read -> GBK Writer > > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo > > > > In my naive prototype implementation, the coder was used to encode > > elements at the gRPC steps. This meant that the coder was > > encoding/decoding/encoding in the first stage and > > decoding/encoding/decoding in the second stage. This tripled the amount > of > > times the coder was being invoked per element. This additional use of the > > coder accounted for ~12% (80% of the 15%) of the extra execution time. > This > > implementation is quite inefficient and would benefit from merging the > gRPC > > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write > into > > another actor allowing for the creation of a fast path that can skip > parts > > of the decode/encode cycle through the coder. By using a byte array view > > over the logical stream, one can minimize the number of byte array copies > > which plagued my naive implementation. This can be done by only parsing > the > > element boundaries out of the stream to produce those logical byte array > > views. I have a very rough estimate that performing this optimization > would > > reduce the 12% overhead to somewhere between 4% and 6%. > > > > The remaining 3% (15% - 12%) overhead went to many parts of gRPC: > > marshalling/unmarshalling protos > > handling/managing the socket > > flow control > > ... > > > > Finally, I did try experiments with different buffer sizes (10KB, 100KB, > > 1000KB), flow control (separate thread[1] vs same thread with phaser[2]), > > and channel type [3] (NIO, epoll, domain socket), but coder overhead > easily > > dominated the differences in these other experiments. > > > > Further analysis would need to be done to more accurately distill this > > down. > > > > 1: https://github.com/lukecwik/incubator-beam/blob/ > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ > > BufferingStreamObserver.java > > 2: https://github.com/lukecwik/incubator-beam/blob/ > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ > > DirectStreamObserver.java > > 3: https://github.com/lukecwik/incubator-beam/blob/ > > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ > > ManagedChannelFactory.java > > > > > > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <[email protected]> wrote: > > > >> Awesome job Lukasz, Excellent, I have to confess the first time I heard > >> about > >> the Fn API idea I was a bit incredulous, but you are making it real, > >> amazing! > >> > >> Just one question from your document, you said that 80% of the extra > (15%) > >> time > >> goes into encoding and decoding the data for your test case, can you > >> expand > >> in > >> your current ideas to improve this? (I am not sure I completely > understand > >> the > >> issue). > >> > >> > >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <[email protected]> > >> wrote: > >> > >> > Responded inline. > >> > > >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <[email protected]> > >> wrote: > >> > > >> > > This is truly amazing Luke! > >> > > > >> > > If I understand this right, the runner executing the DoFn will > >> delegate > >> > the > >> > > function code and input data (and state, coders, etc.) to the > >> container > >> > > where it will execute with the user's SDK of choice, right ? > >> > > >> > > >> > Yes, that is correct. > >> > > >> > > >> > > I wonder how the containers relate to the underlying engine's worker > >> > > processes ? is it a 1-1, container per worker ? if there's less > "work" > >> > for > >> > > the worker's Java process (for example) now and it becomes a sort of > >> > > "dispatcher", would that change the resource allocation commonly > used > >> for > >> > > the same Pipeline so that the worker processes would require less > >> > > resources, while giving those to the container ? > >> > > > >> > > >> > I think with the four services (control, data, state, logging) you can > >> go > >> > with a 1-1 relationship or break it up more finely grained and > dedicate > >> > some machines to have specific tasks. Like you could have a few > machines > >> > dedicated to log aggregation which all the workers push their logs to. > >> > Similarly, you could have some machines that have a lot of memory > which > >> > would be better to be able to do shuffles in memory and then this > >> cluster > >> > of high memory machines could front the data service. I believe there > >> is a > >> > lot of flexibility based upon what a runner can do and what it > >> specializes > >> > in and believe that with more effort comes more possibilities albeit > >> with > >> > increased internal complexity. > >> > > >> > The layout of resources depends on whether the services and SDK > >> containers > >> > are co-hosted on the same machine or whether there is a different > >> > architecture in play. In a co-hosted configuration, it seems likely > that > >> > the SDK container will get more resources but is dependent on the > runner > >> > and pipeline shape (shuffle heavy dominated pipelines will look > >> different > >> > then ParDo dominated pipelines). > >> > > >> > > >> > > About executing sub-graphs, would it be true to say that as long as > >> > there's > >> > > no shuffle, you could keep executing in the same container ? meaning > >> that > >> > > the graph is broken into sub-graphs by shuffles ? > >> > > > >> > > >> > The only thing that is required is that the Apache Beam model is > >> preserved > >> > so typical break points will be at shuffles and language crossing > points > >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the > >> graph > >> > even more for other reasons. > >> > > >> > > >> > > I have to dig-in deeper, so I could have more questions ;-) thanks > >> Luke! > >> > > > >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik > <[email protected] > >> > > >> > > wrote: > >> > > > >> > > > I updated the PR description to contain the same. > >> > > > > >> > > > I would start by looking at the API/object model definitions found > >> in > >> > > > beam_fn_api.proto > >> > > > < > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/ > >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto > >> > > > > > >> > > > > >> > > > Then depending on your interest, look at the following: > >> > > > * FnHarness.java > >> > > > < > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/ > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/ > >> > harness/FnHarness.java > >> > > > > > >> > > > is the main entry point. > >> > > > * org.apache.beam.fn.harness.data > >> > > > < > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/ > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data > >> > > > > > >> > > > contains the most interesting bits of code since it is able to > >> > multiplex > >> > > a > >> > > > gRPC stream into multiple logical streams of elements bound for > >> > multiple > >> > > > concurrent process bundle requests. It also contains the code to > >> take > >> > > > multiple logical outbound streams and multiplex them back onto a > >> gRPC > >> > > > stream. > >> > > > * org.apache.beam.runners.core > >> > > > < > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/ > >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core > >> > > > > > >> > > > contains additional runners akin to the DoFnRunner found in > >> > runners-core > >> > > to > >> > > > support sources and gRPC endpoints. > >> > > > > >> > > > Unless your really interested in how domain sockets, epoll, nio > >> channel > >> > > > factories or how stream readiness callbacks work in gRPC, I would > >> avoid > >> > > the > >> > > > packages org.apache.beam.fn.harness.channel and > >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid > >> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake > >> as > >> > > they > >> > > > don't add anything meaningful to the api. > >> > > > > >> > > > Code package descriptions: > >> > > > > >> > > > org.apache.beam.fn.harness.FnHarness: main entry point > >> > > > org.apache.beam.fn.harness.control: Control service client and > >> > > individual > >> > > > request handlers > >> > > > org.apache.beam.fn.harness.data: Data service client and logical > >> > stream > >> > > > multiplexing > >> > > > org.apache.beam.runners.core: Additional runners akin to the > >> DoFnRunner > >> > > > found in runners-core to support sources and gRPC endpoints > >> > > > org.apache.beam.fn.harness.logging: Logging client implementation > >> and > >> > > JUL > >> > > > logging handler adapter > >> > > > org.apache.beam.fn.harness.channel: gRPC channel management > >> > > > org.apache.beam.fn.harness.stream: gRPC stream management > >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface > >> extensions > >> > > > > >> > > > > >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles > >> > <[email protected] > >> > > > > >> > > > wrote: > >> > > > > >> > > > > This is awesome! Any chance you could roadmap the PR for us with > >> some > >> > > > links > >> > > > > into the most interesting bits? > >> > > > > > >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw < > >> > > > > [email protected]> wrote: > >> > > > > > >> > > > > > Also, note that we can still support the "simple" case. For > >> > example, > >> > > > > > if the user supplies us with a jar file (as they do now) a > >> runner > >> > > > > > could launch it as a subprocesses and communicate with it via > >> this > >> > > > > > same Fn API or install it in a fixed container itself--the > user > >> > > > > > doesn't *need* to know about docker or manually manage > >> containers > >> > > (and > >> > > > > > indeed the Fn API could be used in-process, cross-process, > >> > > > > > cross-container, and even cross-machine). > >> > > > > > > >> > > > > > However docker provides a nice cross-language way of > specifying > >> the > >> > > > > > environment including all dependencies (especially for > languages > >> > like > >> > > > > > Python or C where the equivalent of a cross-platform, > >> > self-contained > >> > > > > > jar isn't as easy to produce) and is strictly more powerful > and > >> > > > > > flexible (specifically it isolates the runtime environment and > >> one > >> > > can > >> > > > > > even use it for local testing). > >> > > > > > > >> > > > > > Slicing a worker up like this without sacrificing performance > >> is an > >> > > > > > ambitious goal, but essential to the story of being able to > mix > >> and > >> > > > > > match runners and SDKs arbitrarily, and I think this is a > great > >> > > start. > >> > > > > > > >> > > > > > > >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik > >> > > <[email protected] > >> > > > > > >> > > > > > wrote: > >> > > > > > > Your correct, a docker container is created that contains > the > >> > > > execution > >> > > > > > > environment the user wants or the user re-uses an existing > one > >> > > > > (allowing > >> > > > > > > for a user to embed all their code/dependencies or use a > >> > container > >> > > > that > >> > > > > > can > >> > > > > > > deploy code/dependencies on demand). > >> > > > > > > A user creates a pipeline saying which docker container they > >> want > >> > > to > >> > > > > use > >> > > > > > > (this starts to allow for multiple container definitions > >> within a > >> > > > > single > >> > > > > > > pipeline to support multiple languages, versioning, ...). > >> > > > > > > A runner would then be responsible for launching one or more > >> of > >> > > these > >> > > > > > > containers in a cluster manager of their choice (scaling up > or > >> > down > >> > > > the > >> > > > > > > number of instances depending on demand/load/...). > >> > > > > > > A runner then interacts with the docker containers over the > >> gRPC > >> > > > > service > >> > > > > > > definitions to delegate processing to. > >> > > > > > > > >> > > > > > > > >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré < > >> > > > [email protected] > >> > > > > > > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > >> Hi Luke, > >> > > > > > >> > >> > > > > > >> that's really great and very promising ! > >> > > > > > >> > >> > > > > > >> It's really ambitious but I like the idea. Just to clarify: > >> the > >> > > > > purpose > >> > > > > > of > >> > > > > > >> using gRPC is once the docker container is running, then we > >> can > >> > > > > > "interact" > >> > > > > > >> with the container to spread and delegate processing to the > >> > docker > >> > > > > > >> container, correct ? > >> > > > > > >> The users/devops have to setup the docker containers as > >> > > > prerequisite. > >> > > > > > >> Then, the "location" of the containers (kind of container > >> > > registry) > >> > > > is > >> > > > > > set > >> > > > > > >> via the pipeline options and used by gRPC ? > >> > > > > > >> > >> > > > > > >> Thanks Luke ! > >> > > > > > >> > >> > > > > > >> Regards > >> > > > > > >> JB > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote: > >> > > > > > >> > >> > > > > > >>> I have been prototyping several components towards the > Beam > >> > > > technical > >> > > > > > >>> vision of being able to execute an arbitrary language > using > >> an > >> > > > > > arbitrary > >> > > > > > >>> runner. > >> > > > > > >>> > >> > > > > > >>> I would like to share this overview [1] of what I have > been > >> > > working > >> > > > > > >>> towards. I also share this PR [2] with a proposed API, > >> service > >> > > > > > definitions > >> > > > > > >>> and partial implementation. > >> > > > > > >>> > >> > > > > > >>> 1: https://s.apache.org/beam-fn-api > >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801 > >> > > > > > >>> > >> > > > > > >>> Please comment on the overview within this thread, and any > >> > > specific > >> > > > > > code > >> > > > > > >>> comments on the PR directly. > >> > > > > > >>> > >> > > > > > >>> Luke > >> > > > > > >>> > >> > > > > > >>> > >> > > > > > >> -- > >> > > > > > >> Jean-Baptiste Onofré > >> > > > > > >> [email protected] > >> > > > > > >> http://blog.nanthrax.net > >> > > > > > >> Talend - http://www.talend.com > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >
