One question - does this change the actual byte encoding of elements? We've tried hard not to do that so far for reasons of compatibility.
Reuven On Sun, Feb 4, 2018 at 6:44 AM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > Hi guys, > > I submitted a PR on coders to enhance 1. the user experience 2. the > determinism and handling of coders. > > 1. the user experience is linked to what i sent some days ago: close > handling of the streams from a coder code. Long story short I add a > SkipCloseCoder which can decorate a coder and just wraps the stream (input > or output) in flavors skipping close() calls. This avoids to do it by > default (which had my preference if you read the related thread but not the > one of everybody) but also makes the usage of a coder with this issue easy > since the of() of the coder just wraps itself in this delagating coder. > > 2. this one is more nasty and mainly concerns IterableLikeCoders. These > ones use this kind of algorithm (keep in mind they work on a list): > > writeSize() > for all element e { > elementCoder.write(e) > } > writeMagicNumber() // this one depends the size > > The decoding is symmetric so I bypass it here. > > Indeed all these writes (reads) are done on the same stream. Therefore it > assumes you read as much bytes than you write...which is a huge assumption > for a coder which should by contract assume it can read the stream...as a > stream (until -1). > > The idea of the fix is to change this encoding to this kind of algorithm: > > writeSize() > for all element e { > writeElementByteCount(e) > elementCoder.write(e) > } > writeMagicNumber() // still optionally > > This way on the decode size you can wrap the stream by element to enforce > the limitation of the byte count. > > Side note: this indeed enforce a limitation due to java byte limitation > but if you check coder code it is already here at the higher level so it is > not a big deal for now. > > In terms of implementation it uses a LengthAwareCoder which delegates to > another coder the encoding and just adds the byte count before the actual > serialization. Not perfect but should be more than enough in terms of > support and perf for beam if you think real pipelines (we try to avoid > serializations or it is done on some well known points where this algo > should be enough...worse case it is not a huge overhead, mainly just some > memory overhead). > > > The PR is available at https://github.com/apache/beam/pull/4594. If you > check you will see I put it "WIP". The main reason is that it changes the > encoding format for containers (lists, iterable, ...) and therefore breaks > python/go/... tests and the standard_coders.yml definition. Some help on > that would be very welcomed. > > Technical side note if you wonder: UnownedInputStream doesn't even allow > to mark the stream so there is no real fast way to read the stream as fast > as possible with standard buffering strategies and to support this > automatic IterableCoder wrapping which is implicit. In other words, if beam > wants to support any coder, including the ones not requiring to write the > size of the output - most of the codecs - then we need to change the way it > works to something like that which does it for the user which doesn't know > its coder got wrapped. > > Hope it makes sense, if not, don't hesitate to ask questions. > > Happy end of week-end. > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> >