I wanted to say a bit more to clarify and enliven this discussion. My use of the term "data plane" may have been confusing. I didn't mean to focus it quite so much on the encoded elements. What I meant to discuss was the entirety of performance-sensitive interactions between the runner and user-defined functions. So let's drop the implied control/data distinction and just talk about the whole interface.
At the risk of writing at length about something everyone knows... the motivation for the Fn API is this: we have a few types of user-definable functions (UDFs) that occur in pipelines, and we need to invoke them in a language-independent manner. These are DoFn, CombineFn, WindowFn, BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder. I will show a bad idea: Take the interfaces of the above functions (minus Coder, which is special) and just turn them into RPC interfaces, and the SDK's job is just to be a trivial or near-trivial bridge from RPC to language-specific method calls. This is a bad proposal, but hopefully helps to show issues such as: - How and when do we deserialize user code / launch a container? (my bad idea above doesn't answer; probably too often!) - How and when do we encode/decode elements? (my bad idea above would require it between every UDF) - How do we manage calls that are more than simply a stream of elements in a bundle? (example: side inputs) Any Fn API is required to have the same semantics as this simple proposal, but should achieve it with superior performance. I'll leave off the details since I am not authoring them personally. But let's assume as a baseline the approach of executing a fused stage of same-language UDFs in a row without any encoding/decoding or RPC, and making a single RPC call per bundle (ignoring amortized round trips for streaming bytes). I gather from this thread these questions (which I may be interpreting wrong; apologies if so) and I would like to answer them relative to this design sketch: Q: Since we have one RPC per bundle and it goes through the whole fused stage, and we have a whole stream of elements per call, doesn't the data dominate the envelope? A: In streaming executions, bundles can be very small, so the data will not necessarily dominate. Q: Do we really need structured messages? Perhaps byte streams with fairly trivial metadata suffice and we can just hand roll it? A: I think that schematized tech is well-proven for adaptability and it is also handy for code gen, regardless of performance. So to me the question is whether or not we need structured messages at all, or if we can model every high throughput communication as coder-encoded streams. I think that things like commits to state, acknowledgements of timer firings, pull-based requests like side inputs are probably best expressed via a schema. But maybe I am overlooking some design ideas. Q: How will side inputs arrive? A: This API is really designed to be pull-based, so it sort of implies a great many small RPCs (and caching). I'm sure I've left off some discussion points, and maybe oversimplified some things, but does this answer the questions somewhat? Does this clarify the suggested choices of tech? Do you still think we don't need them? Kenn On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <[email protected]> wrote: > In storm we use JSON as the default communication between shell bolts and > shell spouts, which allows for APIs in non JVM languages. It works rather > well. That being said it is also slow, and we made it a plugin so others > could make their own, faster, implementations. For storm both the data and > the control are serialized to JSON, so I am not sure how much of that is > control and how much of it is the data that makes it slow. I personally > would like to see a simple benchmark that implements the basic protocol > between the two so we can actually have a more numeric comparison. As well > as any pain that someone experienced trying to implement even a proof of > concept. > > I agree with Amit too that long term we may want to think about supporting > structured data, and rely less on POJOs. It allows for a lot of > optimizations in addition to having out of the box support for > serializing/de-serializing them in another language. But perhaps that is > more of a layer that sits on top of beam instead, because a lot of the > optimizations really make the most since in a declarative DSL like context. > > - Bobby > > On Saturday, June 18, 2016 6:56 AM, Amit Sela <[email protected]> > wrote: > > > My +1 for JSON was for the fact that it's common enough and simpler than > Protbuff/Avro/Thrift, and I would guess that (almost) all languages > acknowledge it, though I might be wrong here. > > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but the > "hardest" thing I had to do to get it working with Spark was to register > 3rd party implementations for Guava Immutable collections. And I honestly > don't know if there is one framework that covers everything in all (common) > languages. > > Finally, if I understand correctly, the suggestion is to transmit the data > as bytes with the appropriate coders, correct ? For the new Spark for > example, they use Encoders > < > https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html > > > that have an internal schema and allows the engine to avoid > deserializations (and other optimizations) using this schema. So while the > current version of the Spark runner actually transforms objects into bytes > prior to shuffle, that might not be the best implementation for the next > generation of the runner... > > This is how I see things from my pretty modest experience with > serialization frameworks. Please correct me if/where I might be wrong. > > Thanks, > Amit > > On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <[email protected]> > wrote: > > > In the Runner API proposal doc, there are 10+ different types with > several > > fields each. > > Is it important to have a code generator for the schema? > > * simplify the SDK development process > > * reduce errors due to differences in custom implementation > > > > I'm not familiar with tool(s) which can take a JSON schema (e.g. > > http://json-schema.org/) and generate code in multiple languages. > Anyone? > > > > > > For the Data Plane API, a Runner and SDK must be able to encode elements > > such as WindowedValue and KVs in such a way that both sides can interpret > > them. For example, a Runner will be required to implement GBK so it must > be > > able to read the windowing information from the "bytes" transmitted, > > additionally it will need to be able to split KV<K, V> records apart and > > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant > way > > of encoding things, the Data Plane API will transmit "bytes" with the > > element boundaries encoded in some way. Aljoscha, I agree with you that a > > good choice for transmitting bytes between VMs/languages is very > important. > > Even though we are still transmitting mostly "bytes", error handling & > > connection handling are still important. > > For example, if we were to use gRPC and proto3 with a bidirectional > stream > > based API, we would get: > > the Runner and SDK can both push data both ways (stream from/to GBK, > stream > > from/to state) > > error handling > > code generation of client libraries > > HTTP/2 > > > > As for the encoding, any SDK can choose any serialization it wants such > as > > Kryo but to get interoperability with other languages that would require > > others to implement parts of the Kryo serialization spec to be able to > > interpret the "bytes". Thus certain types like KV & WindowedValue should > be > > encoded in a way which allows for this interoperability. > > > > > > > > > > > > > > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <[email protected]> wrote: > > > > > +1 on Aljoscha comment, not sure where's the benefit in having a > > > "schematic" serialization. > > > > > > I know that Spark and I think Flink as well, use Kryo > > > <https://github.com/EsotericSoftware/kryo> for serialization (to be > > > accurate it's Chill <https://github.com/twitter/chill> for Spark) and > I > > > found it very impressive even comparing to "manual" serializations, > > > i.e., it seems to outperform Spark's "native" Encoders (1.6+) for > > > primitives.. > > > In addition it clearly supports Java and Scala, and there are 3rd party > > > libraries for Clojure and Objective-C. > > > > > > I guess my bottom-line here agrees with Kenneth - performance and > > > interoperability - but I'm just not sure if schema based serializers > are > > > *always* the fastest. > > > > > > As for pipeline serialization, since performance is not the main issue, > > and > > > I think usability would be very important, I say +1 for JSON. > > > > > > For anyone who spent sometime on benchmarking serialization libraries, > > know > > > is the time to speak up ;) > > > > > > Thanks, > > > Amit > > > > > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <[email protected] > > > > > wrote: > > > > > > > Hi, > > > > am I correct in assuming that the transmitted envelopes would mostly > > > > contain coder-serialized values? If so, wouldn't the header of an > > > envelope > > > > just be the number of contained bytes and number of values? I'm > > probably > > > > missing something but with these assumptions I don't see the benefit > of > > > > using something like Avro/Thrift/Protobuf for serializing the > > main-input > > > > value envelopes. We would just need a system that can send byte data > > > really > > > > fast between languages/VMs. > > > > > > > > By the way, another interesting question (at least for me) is how > other > > > > data, such as side-inputs, is going to arrive at the DoFn if we want > to > > > > support a general interface for different languages. > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles <[email protected] > > > > > > wrote: > > > > > > > > > (Apologies for the formatting) > > > > > > > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <[email protected]> > > > > wrote: > > > > > > > > > > > Hello everyone! > > > > > > > > > > > > We are busily working on a Runner API (for building and > > transmitting > > > > > > pipelines) > > > > > > and a Fn API (for invoking user-defined functions found within > > > > pipelines) > > > > > > as > > > > > > outlined in the Beam technical vision [1]. Both of these require > a > > > > > > language-independent serialization technology for > interoperability > > > > > between > > > > > > SDKs > > > > > > and runners. > > > > > > > > > > > > The Fn API includes a high-bandwidth data plane where bundles are > > > > > > transmitted > > > > > > via some serialization/RPC envelope (inside the envelope, the > > stream > > > of > > > > > > elements is encoded with a coder) to transfer bundles between the > > > > runner > > > > > > and > > > > > > the SDK, so performance is extremely important. There are many > > > choices > > > > > for > > > > > > high > > > > > > performance serialization, and we would like to start the > > > conversation > > > > > > about > > > > > > what serialization technology is best for Beam. > > > > > > > > > > > > The goal of this discussion is to arrive at consensus on the > > > question: > > > > > > What > > > > > > serialization technology should we use for the data plane > envelope > > of > > > > the > > > > > > Fn > > > > > > API? > > > > > > > > > > > > To facilitate community discussion, we looked at the available > > > > > > technologies and > > > > > > tried to narrow the choices based on three criteria: > > > > > > > > > > > > - Performance: What is the size of serialized data? How do we > > expect > > > > the > > > > > > technology to affect pipeline speed and cost? etc > > > > > > > > > > > > - Language support: Does the technology support the most > > widespread > > > > > > language > > > > > > for data processing? Does it have a vibrant ecosystem of > > > contributed > > > > > > language bindings? etc > > > > > > > > > > > > - Community: What is the adoption of the technology? How mature > is > > > it? > > > > > > How > > > > > > active is development? How is the documentation? etc > > > > > > > > > > > > Given these criteria, we came up with four technologies that are > > good > > > > > > contenders. All have similar & adequate schema capabilities. > > > > > > > > > > > > - Apache Avro: Does not require code gen, but embedding the > schema > > > in > > > > > the > > > > > > data > > > > > > could be an issue. Very popular. > > > > > > > > > > > > - Apache Thrift: Probably a bit faster and compact than Avro. A > > huge > > > > > > number of > > > > > > language supported. > > > > > > > > > > > > - Protocol Buffers 3: Incorporates the lessons that Google has > > > learned > > > > > > through > > > > > > long-term use of Protocol Buffers. > > > > > > > > > > > > - FlatBuffers: Some benchmarks imply great performance from the > > > > > zero-copy > > > > > > mmap > > > > > > idea. We would need to run representative experiments. > > > > > > > > > > > > I want to emphasize that this is a community decision, and this > > > thread > > > > is > > > > > > just > > > > > > the conversation starter for us all to weigh in. We just wanted > to > > do > > > > > some > > > > > > legwork to focus the discussion if we could. > > > > > > > > > > > > And there's a minor follow-up question: Once we settle here, is > > that > > > > > > technology > > > > > > also suitable for the low-bandwidth Runner API for defining > > > pipelines, > > > > or > > > > > > does > > > > > > anyone think we need to consider a second technology (like JSON) > > for > > > > > > usability > > > > > > reasons? > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
