One question - does this change the actual byte encoding of elements? We've
tried hard not to do that so far for reasons of compatibility.


On Sun, Feb 4, 2018 at 6:44 AM, Romain Manni-Bucau <>

> Hi guys,
> I submitted a PR on coders to enhance 1. the user experience 2. the
> determinism and handling of coders.
> 1. the user experience is linked to what i sent some days ago: close
> handling of the streams from a coder code. Long story short I add a
> SkipCloseCoder which can decorate a coder and just wraps the stream (input
> or output) in flavors skipping close() calls. This avoids to do it by
> default (which had my preference if you read the related thread but not the
> one of everybody) but also makes the usage of a coder with this issue easy
> since the of() of the coder just wraps itself in this delagating coder.
> 2. this one is more nasty and mainly concerns IterableLikeCoders. These
> ones use this kind of algorithm (keep in mind they work on a list):
> writeSize()
> for all element e {
>     elementCoder.write(e)
> }
> writeMagicNumber() // this one depends the size
> The decoding is symmetric so I bypass it here.
> Indeed all these writes (reads) are done on the same stream. Therefore it
> assumes you read as much bytes than you write...which is a huge assumption
> for a coder which should by contract assume it can read the a
> stream (until -1).
> The idea of the fix is to change this encoding to this kind of algorithm:
> writeSize()
> for all element e {
>     writeElementByteCount(e)
>     elementCoder.write(e)
> }
> writeMagicNumber() // still optionally
> This way on the decode size you can wrap the stream by element to enforce
> the limitation of the byte count.
> Side note: this indeed enforce a limitation due to java byte limitation
> but if you check coder code it is already here at the higher level so it is
> not a big deal for now.
> In terms of implementation it uses a LengthAwareCoder which delegates to
> another coder the encoding and just adds the byte count before the actual
> serialization. Not perfect but should be more than enough in terms of
> support and perf for beam if you think real pipelines (we try to avoid
> serializations or it is done on some well known points where this algo
> should be enough...worse case it is not a huge overhead, mainly just some
> memory overhead).
> The PR is available at If you
> check you will see I put it "WIP". The main reason is that it changes the
> encoding format for containers (lists, iterable, ...) and therefore breaks
> python/go/... tests and the standard_coders.yml definition. Some help on
> that would be very welcomed.
> Technical side note if you wonder: UnownedInputStream doesn't even allow
> to mark the stream so there is no real fast way to read the stream as fast
> as possible with standard buffering strategies and to support this
> automatic IterableCoder wrapping which is implicit. In other words, if beam
> wants to support any coder, including the ones not requiring to write the
> size of the output - most of the codecs - then we need to change the way it
> works to something like that which does it for the user which doesn't know
> its coder got wrapped.
> Hope it makes sense, if not, don't hesitate to ask questions.
> Happy end of week-end.
> Romain Manni-Bucau
> @rmannibucau <> |  Blog
> <> | Old Blog
> <> | Github
> <> | LinkedIn
> <> | Book
> <>

Reply via email to