I think we will want an RPC server because the types of interactions
between the runner and SDK aren't that simple:
process some work (for some definition of work)
split that source
get progress of that source
SDK requests a side input
SDK wants to read/write state
...

As for security, I think that we could have a trusted auth mode (all
communications are done in a trusted environment, e.g. localhost to
localhost communication, private network) and also potentially support an
OAuth2 client credentials grant being used as a bearer token.

The reason I'm suggesting this is because I believe we should have a docker
container dedicated to the users code + SDK which uses the Fn API to
communicate with the runner.
...

On Tue, Jun 28, 2016 at 7:17 AM, Bobby Evans <[email protected]>
wrote:

> What type of RPC do we really need?  Is this envisioned as a many to one
> connection situation?  What other processes exactly are going to be
> attaching to this RPC server and sending/receiving data?  If the non-java
> process opens up a port and listens there are all kinds of potential
> security concerns that will need to be addressed (encryption,
> authentication, authorization, etc.).  Using just stdin, stdout, and stderr
> is standard everywhere that java runs.  Has some decent security built in,
> and all we need is a protocol to wrap the payload in, which is what I
> thought we were talking about.
>  I am fine with other better performing RPC methods, like shared memory.
> Also if there is a reason for us to have a full blown RPC server I would
> have no problem with that, but I don't see a reason for it. What we are
> doing appears to just need single threaded point to point communication.
>
>  - Bobby
>
>     On Monday, June 27, 2016 9:24 AM, Aljoscha Krettek <
> [email protected]> wrote:
>
>
>  Thanks Kenn for expanding on your previous mail. I now have a better idea
> of why we need this.
>
> Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
> best suited for the task. Both of these provide a way for generating
> serializers as well as for specifying an RPC interface. Avro and
> FlatBuffers are only dealing in serializers and we would have to roll our
> own RPC system on top of these. (It seems the gRPC folks have some work
> going on about integrating support for FlatBuffers but not sure when this
> is going to be done: https://github.com/grpc/grpc/issues/5438).
>
> From the description and benchmarks FlatBuffers looks very nice, if it
> really comes with a huge performance increase we might have to consider
> using it with our own RPC instead of Thrift/gRPC+ProtoBuf3. This would mean
> some overhead, however.
>
> I would suggest to do some proof-of-concept implementations with both
> Thrift and gPRC+ProtoBuf3 and see how it compares to the baseline (the
> current implementation where we just directly call methods on the DoFn and
> the DoFn calls methods on the outside directly.). This wouldn't have to
> create a full stack, just enough to see how interaction with the DoFn would
> work for the different systems.
>
> On Wed, 22 Jun 2016 at 23:00 Kenneth Knowles <[email protected]>
> wrote:
>
> > I wanted to say a bit more to clarify and enliven this discussion. My use
> > of the term "data plane" may have been confusing. I didn't mean to focus
> it
> > quite so much on the encoded elements. What I meant to discuss was the
> > entirety of performance-sensitive interactions between the runner and
> > user-defined functions. So let's drop the implied control/data
> distinction
> > and just talk about the whole interface.
> >
> > At the risk of writing at length about something everyone knows... the
> > motivation for the Fn API is this: we have a few types of user-definable
> > functions (UDFs) that occur in pipelines, and we need to invoke them in a
> > language-independent manner. These are DoFn, CombineFn, WindowFn,
> > BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder.
> >
> > I will show a bad idea: Take the interfaces of the above functions (minus
> > Coder, which is special) and just turn them into RPC interfaces, and the
> > SDK's job is just to be a trivial or near-trivial bridge from RPC to
> > language-specific method calls. This is a bad proposal, but hopefully
> helps
> > to show issues such as:
> >
> >  - How and when do we deserialize user code / launch a container? (my bad
> > idea above doesn't answer; probably too often!)
> >  - How and when do we encode/decode elements? (my bad idea above would
> > require it between every UDF)
> >  - How do we manage calls that are more than simply a stream of elements
> in
> > a bundle? (example: side inputs)
> >
> > Any Fn API is required to have the same semantics as this simple
> proposal,
> > but should achieve it with superior performance. I'll leave off the
> details
> > since I am not authoring them personally. But let's assume as a baseline
> > the approach of executing a fused stage of same-language UDFs in a row
> > without any encoding/decoding or RPC, and making a single RPC call per
> > bundle (ignoring amortized round trips for streaming bytes).
> >
> > I gather from this thread these questions (which I may be interpreting
> > wrong; apologies if so) and I would like to answer them relative to this
> > design sketch:
> >
> > Q: Since we have one RPC per bundle and it goes through the whole fused
> > stage, and we have a whole stream of elements per call, doesn't the data
> > dominate the envelope?
> > A: In streaming executions, bundles can be very small, so the data will
> not
> > necessarily dominate.
> >
> > Q: Do we really need structured messages? Perhaps byte streams with
> fairly
> > trivial metadata suffice and we can just hand roll it?
> > A: I think that schematized tech is well-proven for adaptability and it
> is
> > also handy for code gen, regardless of performance. So to me the question
> > is whether or not we need structured messages at all, or if we can model
> > every high throughput communication as coder-encoded streams. I think
> that
> > things like commits to state, acknowledgements of timer firings,
> pull-based
> > requests like side inputs are probably best expressed via a schema. But
> > maybe I am overlooking some design ideas.
> >
> > Q: How will side inputs arrive?
> > A: This API is really designed to be pull-based, so it sort of implies a
> > great many small RPCs (and caching).
> >
> > I'm sure I've left off some discussion points, and maybe oversimplified
> > some things, but does this answer the questions somewhat? Does this
> clarify
> > the suggested choices of tech? Do you still think we don't need them?
> >
> > Kenn
> >
> > On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <[email protected]
> >
> > wrote:
> >
> > > In storm we use JSON as the default communication between shell bolts
> and
> > > shell spouts, which allows for APIs in non JVM languages. It works
> rather
> > > well.  That being said it is also slow, and we made it a plugin so
> others
> > > could make their own, faster, implementations.  For storm both the data
> > and
> > > the control are serialized to JSON, so I am not sure how much of that
> is
> > > control and how much of it is the data that makes it slow.  I
> personally
> > > would like to see a simple benchmark that implements the basic protocol
> > > between the two so we can actually have a more numeric comparison.  As
> > well
> > > as any pain that someone experienced trying to implement even a proof
> of
> > > concept.
> > >
> > > I agree with Amit too that long term we may want to think about
> > supporting
> > > structured data, and rely less on POJOs.  It allows for a lot of
> > > optimizations in addition to having out of the box support for
> > > serializing/de-serializing them in another language. But perhaps that
> is
> > > more of a layer that sits on top of beam instead, because a lot of the
> > > optimizations really make the most since in a declarative DSL like
> > context.
> > >
> > >  - Bobby
> > >
> > >    On Saturday, June 18, 2016 6:56 AM, Amit Sela <[email protected]
> >
> > > wrote:
> > >
> > >
> > >  My +1 for JSON was for the fact that it's common enough and simpler
> than
> > > Protbuff/Avro/Thrift, and I would guess that (almost) all languages
> > > acknowledge it, though I might be wrong here.
> > >
> > > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but
> > the
> > > "hardest" thing I had to do to get it working with Spark was to
> register
> > > 3rd party implementations for Guava Immutable collections. And I
> honestly
> > > don't know if there is one framework that covers everything in all
> > (common)
> > > languages.
> > >
> > > Finally, if I understand correctly, the suggestion is to transmit the
> > data
> > > as bytes with the appropriate coders, correct ? For the new Spark for
> > > example, they use Encoders
> > > <
> > >
> >
> https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
> > > >
> > > that have an internal schema and allows the engine to avoid
> > > deserializations (and other optimizations) using this schema. So while
> > the
> > > current version of the Spark runner actually transforms objects into
> > bytes
> > > prior to shuffle, that might not be the best implementation for the
> next
> > > generation of the runner...
> > >
> > > This is how I see things from my pretty modest experience with
> > > serialization frameworks. Please correct me if/where I might be wrong.
> > >
> > > Thanks,
> > > Amit
> > >
> > > On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <[email protected]>
> > > wrote:
> > >
> > > > In the Runner API proposal doc, there are 10+ different types with
> > > several
> > > > fields each.
> > > > Is it important to have a code generator for the schema?
> > > > * simplify the SDK development process
> > > > * reduce errors due to differences in custom implementation
> > > >
> > > > I'm not familiar with tool(s) which can take a JSON schema (e.g.
> > > > http://json-schema.org/) and generate code in multiple languages.
> > > Anyone?
> > > >
> > > >
> > > > For the Data Plane API, a Runner and SDK must be able to encode
> > elements
> > > > such as WindowedValue and KVs in such a way that both sides can
> > interpret
> > > > them. For example, a Runner will be required to implement GBK so it
> > must
> > > be
> > > > able to read the windowing information from the "bytes" transmitted,
> > > > additionally it will need to be able to split KV<K, V> records apart
> > and
> > > > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the
> dominant
> > > way
> > > > of encoding things, the Data Plane API will transmit "bytes" with the
> > > > element boundaries encoded in some way. Aljoscha, I agree with you
> > that a
> > > > good choice for transmitting bytes between VMs/languages is very
> > > important.
> > > > Even though we are still transmitting mostly "bytes", error handling
> &
> > > > connection handling are still important.
> > > > For example, if we were to use gRPC and proto3 with a bidirectional
> > > stream
> > > > based API, we would get:
> > > > the Runner and SDK can both push data both ways (stream from/to GBK,
> > > stream
> > > > from/to state)
> > > > error handling
> > > > code generation of client libraries
> > > > HTTP/2
> > > >
> > > > As for the encoding, any SDK can choose any serialization it wants
> such
> > > as
> > > > Kryo but to get interoperability with other languages that would
> > require
> > > > others to implement parts of the Kryo serialization spec to be able
> to
> > > > interpret the "bytes". Thus certain types like KV & WindowedValue
> > should
> > > be
> > > > encoded in a way which allows for this interoperability.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <[email protected]>
> > wrote:
> > > >
> > > > > +1 on Aljoscha comment, not sure where's the benefit in having a
> > > > > "schematic" serialization.
> > > > >
> > > > > I know that Spark and I think Flink as well, use Kryo
> > > > > <https://github.com/EsotericSoftware/kryo> for serialization (to
> be
> > > > > accurate it's Chill <https://github.com/twitter/chill> for Spark)
> > and
> > > I
> > > > > found it very impressive even comparing to "manual" serializations,
> > > > >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > > > > primitives..
> > > > > In addition it clearly supports Java and Scala, and there are 3rd
> > party
> > > > > libraries for Clojure and Objective-C.
> > > > >
> > > > > I guess my bottom-line here agrees with Kenneth - performance and
> > > > > interoperability - but I'm just not sure if schema based
> serializers
> > > are
> > > > > *always* the fastest.
> > > > >
> > > > > As for pipeline serialization, since performance is not the main
> > issue,
> > > > and
> > > > > I think usability would be very important, I say +1 for JSON.
> > > > >
> > > > > For anyone who spent sometime on benchmarking serialization
> > libraries,
> > > > know
> > > > > is the time to speak up ;)
> > > > >
> > > > > Thanks,
> > > > > Amit
> > > > >
> > > > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <
> > [email protected]
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > am I correct in assuming that the transmitted envelopes would
> > mostly
> > > > > > contain coder-serialized values? If so, wouldn't the header of an
> > > > > envelope
> > > > > > just be the number of contained bytes and number of values? I'm
> > > > probably
> > > > > > missing something but with these assumptions I don't see the
> > benefit
> > > of
> > > > > > using something like Avro/Thrift/Protobuf for serializing the
> > > > main-input
> > > > > > value envelopes. We would just need a system that can send byte
> > data
> > > > > really
> > > > > > fast between languages/VMs.
> > > > > >
> > > > > > By the way, another interesting question (at least for me) is how
> > > other
> > > > > > data, such as side-inputs, is going to arrive at the DoFn if we
> > want
> > > to
> > > > > > support a general interface for different languages.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles
> > <[email protected]
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > (Apologies for the formatting)
> > > > > > >
> > > > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <
> > [email protected]>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello everyone!
> > > > > > > >
> > > > > > > > We are busily working on a Runner API (for building and
> > > > transmitting
> > > > > > > > pipelines)
> > > > > > > > and a Fn API (for invoking user-defined functions found
> within
> > > > > > pipelines)
> > > > > > > > as
> > > > > > > > outlined in the Beam technical vision [1]. Both of these
> > require
> > > a
> > > > > > > > language-independent serialization technology for
> > > interoperability
> > > > > > > between
> > > > > > > > SDKs
> > > > > > > > and runners.
> > > > > > > >
> > > > > > > > The Fn API includes a high-bandwidth data plane where bundles
> > are
> > > > > > > > transmitted
> > > > > > > > via some serialization/RPC envelope (inside the envelope, the
> > > > stream
> > > > > of
> > > > > > > > elements is encoded with a coder) to transfer bundles between
> > the
> > > > > > runner
> > > > > > > > and
> > > > > > > > the SDK, so performance is extremely important. There are
> many
> > > > > choices
> > > > > > > for
> > > > > > > > high
> > > > > > > > performance serialization, and we would like to start the
> > > > > conversation
> > > > > > > > about
> > > > > > > > what serialization technology is best for Beam.
> > > > > > > >
> > > > > > > > The goal of this discussion is to arrive at consensus on the
> > > > > question:
> > > > > > > > What
> > > > > > > > serialization technology should we use for the data plane
> > > envelope
> > > > of
> > > > > > the
> > > > > > > > Fn
> > > > > > > > API?
> > > > > > > >
> > > > > > > > To facilitate community discussion, we looked at the
> available
> > > > > > > > technologies and
> > > > > > > > tried to narrow the choices based on three criteria:
> > > > > > > >
> > > > > > > >  - Performance: What is the size of serialized data? How do
> we
> > > > expect
> > > > > > the
> > > > > > > >    technology to affect pipeline speed and cost? etc
> > > > > > > >
> > > > > > > >  - Language support: Does the technology support the most
> > > > widespread
> > > > > > > > language
> > > > > > > >    for data processing? Does it have a vibrant ecosystem of
> > > > > contributed
> > > > > > > >    language bindings? etc
> > > > > > > >
> > > > > > > >  - Community: What is the adoption of the technology? How
> > mature
> > > is
> > > > > it?
> > > > > > > > How
> > > > > > > >    active is development? How is the documentation? etc
> > > > > > > >
> > > > > > > > Given these criteria, we came up with four technologies that
> > are
> > > > good
> > > > > > > > contenders. All have similar & adequate schema capabilities.
> > > > > > > >
> > > > > > > >  - Apache Avro: Does not require code gen, but embedding the
> > > schema
> > > > > in
> > > > > > > the
> > > > > > > > data
> > > > > > > >    could be an issue. Very popular.
> > > > > > > >
> > > > > > > >  - Apache Thrift: Probably a bit faster and compact than
> Avro.
> > A
> > > > huge
> > > > > > > > number of
> > > > > > > >    language supported.
> > > > > > > >
> > > > > > > >  - Protocol Buffers 3: Incorporates the lessons that Google
> has
> > > > > learned
> > > > > > > > through
> > > > > > > >    long-term use of Protocol Buffers.
> > > > > > > >
> > > > > > > >  - FlatBuffers: Some benchmarks imply great performance from
> > the
> > > > > > > zero-copy
> > > > > > > > mmap
> > > > > > > >    idea. We would need to run representative experiments.
> > > > > > > >
> > > > > > > > I want to emphasize that this is a community decision, and
> this
> > > > > thread
> > > > > > is
> > > > > > > > just
> > > > > > > > the conversation starter for us all to weigh in. We just
> wanted
> > > to
> > > > do
> > > > > > > some
> > > > > > > > legwork to focus the discussion if we could.
> > > > > > > >
> > > > > > > > And there's a minor follow-up question: Once we settle here,
> is
> > > > that
> > > > > > > > technology
> > > > > > > > also suitable for the low-bandwidth Runner API for defining
> > > > > pipelines,
> > > > > > or
> > > > > > > > does
> > > > > > > > anyone think we need to consider a second technology (like
> > JSON)
> > > > for
> > > > > > > > usability
> > > > > > > > reasons?
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > >
> >
>
>
>
>

Reply via email to