On Sun, Feb 4, 2018 at 6:44 AM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > Hi guys, > > I submitted a PR on coders to enhance 1. the user experience 2. the > determinism and handling of coders. > > 1. the user experience is linked to what i sent some days ago: close > handling of the streams from a coder code. Long story short I add a > SkipCloseCoder which can decorate a coder and just wraps the stream (input > or output) in flavors skipping close() calls. This avoids to do it by > default (which had my preference if you read the related thread but not the > one of everybody) but also makes the usage of a coder with this issue easy > since the of() of the coder just wraps itself in this delagating coder. > > 2. this one is more nasty and mainly concerns IterableLikeCoders. These ones > use this kind of algorithm (keep in mind they work on a list): > > writeSize() > for all element e { > elementCoder.write(e) > } > writeMagicNumber() // this one depends the size > > The decoding is symmetric so I bypass it here. > > Indeed all these writes (reads) are done on the same stream. Therefore it > assumes you read as much bytes than you write...which is a huge assumption > for a coder which should by contract assume it can read the stream...as a > stream (until -1). > > The idea of the fix is to change this encoding to this kind of algorithm: > > writeSize() > for all element e { > writeElementByteCount(e) > elementCoder.write(e) > } > writeMagicNumber() // still optionally
Regardless of the backwards incompatibility issues, I'm unconvinced that prefixing every element with its length is a good idea. It can lead to blow-up in size (e.g. a list of ints, and it should be noted that containers with lots of elements bias towards having small elements) and also writeElementByteCount(e) could be very inefficient for many type e (e.g. a list of lists). > This way on the decode size you can wrap the stream by element to enforce > the limitation of the byte count. > > Side note: this indeed enforce a limitation due to java byte limitation but > if you check coder code it is already here at the higher level so it is not > a big deal for now. > > In terms of implementation it uses a LengthAwareCoder which delegates to > another coder the encoding and just adds the byte count before the actual > serialization. Not perfect but should be more than enough in terms of > support and perf for beam if you think real pipelines (we try to avoid > serializations or it is done on some well known points where this algo > should be enough...worse case it is not a huge overhead, mainly just some > memory overhead). > > > The PR is available at https://github.com/apache/beam/pull/4594. If you > check you will see I put it "WIP". The main reason is that it changes the > encoding format for containers (lists, iterable, ...) and therefore breaks > python/go/... tests and the standard_coders.yml definition. Some help on > that would be very welcomed. > > Technical side note if you wonder: UnownedInputStream doesn't even allow to > mark the stream so there is no real fast way to read the stream as fast as > possible with standard buffering strategies and to support this automatic > IterableCoder wrapping which is implicit. In other words, if beam wants to > support any coder, including the ones not requiring to write the size of the > output - most of the codecs - then we need to change the way it works to > something like that which does it for the user which doesn't know its coder > got wrapped. > > Hope it makes sense, if not, don't hesitate to ask questions. > > Happy end of week-end. > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn | Book