On Sun, Feb 4, 2018 at 6:44 AM, Romain Manni-Bucau
<rmannibu...@gmail.com> wrote:
> Hi guys,
>
> I submitted a PR on coders to enhance 1. the user experience 2. the
> determinism and handling of coders.
>
> 1. the user experience is linked to what i sent some days ago: close
> handling of the streams from a coder code. Long story short I add a
> SkipCloseCoder which can decorate a coder and just wraps the stream (input
> or output) in flavors skipping close() calls. This avoids to do it by
> default (which had my preference if you read the related thread but not the
> one of everybody) but also makes the usage of a coder with this issue easy
> since the of() of the coder just wraps itself in this delagating coder.
>
> 2. this one is more nasty and mainly concerns IterableLikeCoders. These ones
> use this kind of algorithm (keep in mind they work on a list):
>
> writeSize()
> for all element e {
>     elementCoder.write(e)
> }
> writeMagicNumber() // this one depends the size
>
> The decoding is symmetric so I bypass it here.
>
> Indeed all these writes (reads) are done on the same stream. Therefore it
> assumes you read as much bytes than you write...which is a huge assumption
> for a coder which should by contract assume it can read the stream...as a
> stream (until -1).
>
> The idea of the fix is to change this encoding to this kind of algorithm:
>
> writeSize()
> for all element e {
>     writeElementByteCount(e)
>     elementCoder.write(e)
> }
> writeMagicNumber() // still optionally

Regardless of the backwards incompatibility issues, I'm unconvinced
that prefixing every element with its length is a good idea. It can
lead to blow-up in size (e.g. a list of ints, and it should be noted
that containers with lots of elements bias towards having small
elements) and also writeElementByteCount(e) could be very inefficient
for many type e (e.g. a list of lists).

> This way on the decode size you can wrap the stream by element to enforce
> the limitation of the byte count.
>
> Side note: this indeed enforce a limitation due to java byte limitation but
> if you check coder code it is already here at the higher level so it is not
> a big deal for now.
>
> In terms of implementation it uses a LengthAwareCoder which delegates to
> another coder the encoding and just adds the byte count before the actual
> serialization. Not perfect but should be more than enough in terms of
> support and perf for beam if you think real pipelines (we try to avoid
> serializations or it is done on some well known points where this algo
> should be enough...worse case it is not a huge overhead, mainly just some
> memory overhead).
>
>
> The PR is available at https://github.com/apache/beam/pull/4594. If you
> check you will see I put it "WIP". The main reason is that it changes the
> encoding format for containers (lists, iterable, ...) and therefore breaks
> python/go/... tests and the standard_coders.yml definition. Some help on
> that would be very welcomed.
>
> Technical side note if you wonder: UnownedInputStream doesn't even allow to
> mark the stream so there is no real fast way to read the stream as fast as
> possible with standard buffering strategies and to support this automatic
> IterableCoder wrapping which is implicit. In other words, if beam wants to
> support any coder, including the ones not requiring to write the size of the
> output - most of the codecs - then we need to change the way it works to
> something like that which does it for the user which doesn't know its coder
> got wrapped.
>
> Hope it makes sense, if not, don't hesitate to ask questions.
>
> Happy end of week-end.
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book

Reply via email to