I think we will want an RPC server because the types of interactions between the runner and SDK aren't that simple: process some work (for some definition of work) split that source get progress of that source SDK requests a side input SDK wants to read/write state ...
As for security, I think that we could have a trusted auth mode (all communications are done in a trusted environment, e.g. localhost to localhost communication, private network) and also potentially support an OAuth2 client credentials grant being used as a bearer token. The reason I'm suggesting this is because I believe we should have a docker container dedicated to the users code + SDK which uses the Fn API to communicate with the runner. ... On Tue, Jun 28, 2016 at 7:17 AM, Bobby Evans <[email protected]> wrote: > What type of RPC do we really need? Is this envisioned as a many to one > connection situation? What other processes exactly are going to be > attaching to this RPC server and sending/receiving data? If the non-java > process opens up a port and listens there are all kinds of potential > security concerns that will need to be addressed (encryption, > authentication, authorization, etc.). Using just stdin, stdout, and stderr > is standard everywhere that java runs. Has some decent security built in, > and all we need is a protocol to wrap the payload in, which is what I > thought we were talking about. > I am fine with other better performing RPC methods, like shared memory. > Also if there is a reason for us to have a full blown RPC server I would > have no problem with that, but I don't see a reason for it. What we are > doing appears to just need single threaded point to point communication. > > - Bobby > > On Monday, June 27, 2016 9:24 AM, Aljoscha Krettek < > [email protected]> wrote: > > > Thanks Kenn for expanding on your previous mail. I now have a better idea > of why we need this. > > Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably > best suited for the task. Both of these provide a way for generating > serializers as well as for specifying an RPC interface. Avro and > FlatBuffers are only dealing in serializers and we would have to roll our > own RPC system on top of these. (It seems the gRPC folks have some work > going on about integrating support for FlatBuffers but not sure when this > is going to be done: https://github.com/grpc/grpc/issues/5438). > > From the description and benchmarks FlatBuffers looks very nice, if it > really comes with a huge performance increase we might have to consider > using it with our own RPC instead of Thrift/gRPC+ProtoBuf3. This would mean > some overhead, however. > > I would suggest to do some proof-of-concept implementations with both > Thrift and gPRC+ProtoBuf3 and see how it compares to the baseline (the > current implementation where we just directly call methods on the DoFn and > the DoFn calls methods on the outside directly.). This wouldn't have to > create a full stack, just enough to see how interaction with the DoFn would > work for the different systems. > > On Wed, 22 Jun 2016 at 23:00 Kenneth Knowles <[email protected]> > wrote: > > > I wanted to say a bit more to clarify and enliven this discussion. My use > > of the term "data plane" may have been confusing. I didn't mean to focus > it > > quite so much on the encoded elements. What I meant to discuss was the > > entirety of performance-sensitive interactions between the runner and > > user-defined functions. So let's drop the implied control/data > distinction > > and just talk about the whole interface. > > > > At the risk of writing at length about something everyone knows... the > > motivation for the Fn API is this: we have a few types of user-definable > > functions (UDFs) that occur in pipelines, and we need to invoke them in a > > language-independent manner. These are DoFn, CombineFn, WindowFn, > > BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder. > > > > I will show a bad idea: Take the interfaces of the above functions (minus > > Coder, which is special) and just turn them into RPC interfaces, and the > > SDK's job is just to be a trivial or near-trivial bridge from RPC to > > language-specific method calls. This is a bad proposal, but hopefully > helps > > to show issues such as: > > > > - How and when do we deserialize user code / launch a container? (my bad > > idea above doesn't answer; probably too often!) > > - How and when do we encode/decode elements? (my bad idea above would > > require it between every UDF) > > - How do we manage calls that are more than simply a stream of elements > in > > a bundle? (example: side inputs) > > > > Any Fn API is required to have the same semantics as this simple > proposal, > > but should achieve it with superior performance. I'll leave off the > details > > since I am not authoring them personally. But let's assume as a baseline > > the approach of executing a fused stage of same-language UDFs in a row > > without any encoding/decoding or RPC, and making a single RPC call per > > bundle (ignoring amortized round trips for streaming bytes). > > > > I gather from this thread these questions (which I may be interpreting > > wrong; apologies if so) and I would like to answer them relative to this > > design sketch: > > > > Q: Since we have one RPC per bundle and it goes through the whole fused > > stage, and we have a whole stream of elements per call, doesn't the data > > dominate the envelope? > > A: In streaming executions, bundles can be very small, so the data will > not > > necessarily dominate. > > > > Q: Do we really need structured messages? Perhaps byte streams with > fairly > > trivial metadata suffice and we can just hand roll it? > > A: I think that schematized tech is well-proven for adaptability and it > is > > also handy for code gen, regardless of performance. So to me the question > > is whether or not we need structured messages at all, or if we can model > > every high throughput communication as coder-encoded streams. I think > that > > things like commits to state, acknowledgements of timer firings, > pull-based > > requests like side inputs are probably best expressed via a schema. But > > maybe I am overlooking some design ideas. > > > > Q: How will side inputs arrive? > > A: This API is really designed to be pull-based, so it sort of implies a > > great many small RPCs (and caching). > > > > I'm sure I've left off some discussion points, and maybe oversimplified > > some things, but does this answer the questions somewhat? Does this > clarify > > the suggested choices of tech? Do you still think we don't need them? > > > > Kenn > > > > On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <[email protected] > > > > wrote: > > > > > In storm we use JSON as the default communication between shell bolts > and > > > shell spouts, which allows for APIs in non JVM languages. It works > rather > > > well. That being said it is also slow, and we made it a plugin so > others > > > could make their own, faster, implementations. For storm both the data > > and > > > the control are serialized to JSON, so I am not sure how much of that > is > > > control and how much of it is the data that makes it slow. I > personally > > > would like to see a simple benchmark that implements the basic protocol > > > between the two so we can actually have a more numeric comparison. As > > well > > > as any pain that someone experienced trying to implement even a proof > of > > > concept. > > > > > > I agree with Amit too that long term we may want to think about > > supporting > > > structured data, and rely less on POJOs. It allows for a lot of > > > optimizations in addition to having out of the box support for > > > serializing/de-serializing them in another language. But perhaps that > is > > > more of a layer that sits on top of beam instead, because a lot of the > > > optimizations really make the most since in a declarative DSL like > > context. > > > > > > - Bobby > > > > > > On Saturday, June 18, 2016 6:56 AM, Amit Sela <[email protected] > > > > > wrote: > > > > > > > > > My +1 for JSON was for the fact that it's common enough and simpler > than > > > Protbuff/Avro/Thrift, and I would guess that (almost) all languages > > > acknowledge it, though I might be wrong here. > > > > > > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but > > the > > > "hardest" thing I had to do to get it working with Spark was to > register > > > 3rd party implementations for Guava Immutable collections. And I > honestly > > > don't know if there is one framework that covers everything in all > > (common) > > > languages. > > > > > > Finally, if I understand correctly, the suggestion is to transmit the > > data > > > as bytes with the appropriate coders, correct ? For the new Spark for > > > example, they use Encoders > > > < > > > > > > https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html > > > > > > > that have an internal schema and allows the engine to avoid > > > deserializations (and other optimizations) using this schema. So while > > the > > > current version of the Spark runner actually transforms objects into > > bytes > > > prior to shuffle, that might not be the best implementation for the > next > > > generation of the runner... > > > > > > This is how I see things from my pretty modest experience with > > > serialization frameworks. Please correct me if/where I might be wrong. > > > > > > Thanks, > > > Amit > > > > > > On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <[email protected]> > > > wrote: > > > > > > > In the Runner API proposal doc, there are 10+ different types with > > > several > > > > fields each. > > > > Is it important to have a code generator for the schema? > > > > * simplify the SDK development process > > > > * reduce errors due to differences in custom implementation > > > > > > > > I'm not familiar with tool(s) which can take a JSON schema (e.g. > > > > http://json-schema.org/) and generate code in multiple languages. > > > Anyone? > > > > > > > > > > > > For the Data Plane API, a Runner and SDK must be able to encode > > elements > > > > such as WindowedValue and KVs in such a way that both sides can > > interpret > > > > them. For example, a Runner will be required to implement GBK so it > > must > > > be > > > > able to read the windowing information from the "bytes" transmitted, > > > > additionally it will need to be able to split KV<K, V> records apart > > and > > > > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the > dominant > > > way > > > > of encoding things, the Data Plane API will transmit "bytes" with the > > > > element boundaries encoded in some way. Aljoscha, I agree with you > > that a > > > > good choice for transmitting bytes between VMs/languages is very > > > important. > > > > Even though we are still transmitting mostly "bytes", error handling > & > > > > connection handling are still important. > > > > For example, if we were to use gRPC and proto3 with a bidirectional > > > stream > > > > based API, we would get: > > > > the Runner and SDK can both push data both ways (stream from/to GBK, > > > stream > > > > from/to state) > > > > error handling > > > > code generation of client libraries > > > > HTTP/2 > > > > > > > > As for the encoding, any SDK can choose any serialization it wants > such > > > as > > > > Kryo but to get interoperability with other languages that would > > require > > > > others to implement parts of the Kryo serialization spec to be able > to > > > > interpret the "bytes". Thus certain types like KV & WindowedValue > > should > > > be > > > > encoded in a way which allows for this interoperability. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <[email protected]> > > wrote: > > > > > > > > > +1 on Aljoscha comment, not sure where's the benefit in having a > > > > > "schematic" serialization. > > > > > > > > > > I know that Spark and I think Flink as well, use Kryo > > > > > <https://github.com/EsotericSoftware/kryo> for serialization (to > be > > > > > accurate it's Chill <https://github.com/twitter/chill> for Spark) > > and > > > I > > > > > found it very impressive even comparing to "manual" serializations, > > > > > i.e., it seems to outperform Spark's "native" Encoders (1.6+) for > > > > > primitives.. > > > > > In addition it clearly supports Java and Scala, and there are 3rd > > party > > > > > libraries for Clojure and Objective-C. > > > > > > > > > > I guess my bottom-line here agrees with Kenneth - performance and > > > > > interoperability - but I'm just not sure if schema based > serializers > > > are > > > > > *always* the fastest. > > > > > > > > > > As for pipeline serialization, since performance is not the main > > issue, > > > > and > > > > > I think usability would be very important, I say +1 for JSON. > > > > > > > > > > For anyone who spent sometime on benchmarking serialization > > libraries, > > > > know > > > > > is the time to speak up ;) > > > > > > > > > > Thanks, > > > > > Amit > > > > > > > > > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek < > > [email protected] > > > > > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > am I correct in assuming that the transmitted envelopes would > > mostly > > > > > > contain coder-serialized values? If so, wouldn't the header of an > > > > > envelope > > > > > > just be the number of contained bytes and number of values? I'm > > > > probably > > > > > > missing something but with these assumptions I don't see the > > benefit > > > of > > > > > > using something like Avro/Thrift/Protobuf for serializing the > > > > main-input > > > > > > value envelopes. We would just need a system that can send byte > > data > > > > > really > > > > > > fast between languages/VMs. > > > > > > > > > > > > By the way, another interesting question (at least for me) is how > > > other > > > > > > data, such as side-inputs, is going to arrive at the DoFn if we > > want > > > to > > > > > > support a general interface for different languages. > > > > > > > > > > > > Cheers, > > > > > > Aljoscha > > > > > > > > > > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles > > <[email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > (Apologies for the formatting) > > > > > > > > > > > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles < > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > > > Hello everyone! > > > > > > > > > > > > > > > > We are busily working on a Runner API (for building and > > > > transmitting > > > > > > > > pipelines) > > > > > > > > and a Fn API (for invoking user-defined functions found > within > > > > > > pipelines) > > > > > > > > as > > > > > > > > outlined in the Beam technical vision [1]. Both of these > > require > > > a > > > > > > > > language-independent serialization technology for > > > interoperability > > > > > > > between > > > > > > > > SDKs > > > > > > > > and runners. > > > > > > > > > > > > > > > > The Fn API includes a high-bandwidth data plane where bundles > > are > > > > > > > > transmitted > > > > > > > > via some serialization/RPC envelope (inside the envelope, the > > > > stream > > > > > of > > > > > > > > elements is encoded with a coder) to transfer bundles between > > the > > > > > > runner > > > > > > > > and > > > > > > > > the SDK, so performance is extremely important. There are > many > > > > > choices > > > > > > > for > > > > > > > > high > > > > > > > > performance serialization, and we would like to start the > > > > > conversation > > > > > > > > about > > > > > > > > what serialization technology is best for Beam. > > > > > > > > > > > > > > > > The goal of this discussion is to arrive at consensus on the > > > > > question: > > > > > > > > What > > > > > > > > serialization technology should we use for the data plane > > > envelope > > > > of > > > > > > the > > > > > > > > Fn > > > > > > > > API? > > > > > > > > > > > > > > > > To facilitate community discussion, we looked at the > available > > > > > > > > technologies and > > > > > > > > tried to narrow the choices based on three criteria: > > > > > > > > > > > > > > > > - Performance: What is the size of serialized data? How do > we > > > > expect > > > > > > the > > > > > > > > technology to affect pipeline speed and cost? etc > > > > > > > > > > > > > > > > - Language support: Does the technology support the most > > > > widespread > > > > > > > > language > > > > > > > > for data processing? Does it have a vibrant ecosystem of > > > > > contributed > > > > > > > > language bindings? etc > > > > > > > > > > > > > > > > - Community: What is the adoption of the technology? How > > mature > > > is > > > > > it? > > > > > > > > How > > > > > > > > active is development? How is the documentation? etc > > > > > > > > > > > > > > > > Given these criteria, we came up with four technologies that > > are > > > > good > > > > > > > > contenders. All have similar & adequate schema capabilities. > > > > > > > > > > > > > > > > - Apache Avro: Does not require code gen, but embedding the > > > schema > > > > > in > > > > > > > the > > > > > > > > data > > > > > > > > could be an issue. Very popular. > > > > > > > > > > > > > > > > - Apache Thrift: Probably a bit faster and compact than > Avro. > > A > > > > huge > > > > > > > > number of > > > > > > > > language supported. > > > > > > > > > > > > > > > > - Protocol Buffers 3: Incorporates the lessons that Google > has > > > > > learned > > > > > > > > through > > > > > > > > long-term use of Protocol Buffers. > > > > > > > > > > > > > > > > - FlatBuffers: Some benchmarks imply great performance from > > the > > > > > > > zero-copy > > > > > > > > mmap > > > > > > > > idea. We would need to run representative experiments. > > > > > > > > > > > > > > > > I want to emphasize that this is a community decision, and > this > > > > > thread > > > > > > is > > > > > > > > just > > > > > > > > the conversation starter for us all to weigh in. We just > wanted > > > to > > > > do > > > > > > > some > > > > > > > > legwork to focus the discussion if we could. > > > > > > > > > > > > > > > > And there's a minor follow-up question: Once we settle here, > is > > > > that > > > > > > > > technology > > > > > > > > also suitable for the low-bandwidth Runner API for defining > > > > > pipelines, > > > > > > or > > > > > > > > does > > > > > > > > anyone think we need to consider a second technology (like > > JSON) > > > > for > > > > > > > > usability > > > > > > > > reasons? > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
