Hi guys,

I submitted a PR on coders to enhance 1. the user experience 2. the
determinism and handling of coders.

1. the user experience is linked to what i sent some days ago: close
handling of the streams from a coder code. Long story short I add a
SkipCloseCoder which can decorate a coder and just wraps the stream (input
or output) in flavors skipping close() calls. This avoids to do it by
default (which had my preference if you read the related thread but not the
one of everybody) but also makes the usage of a coder with this issue easy
since the of() of the coder just wraps itself in this delagating coder.

2. this one is more nasty and mainly concerns IterableLikeCoders. These
ones use this kind of algorithm (keep in mind they work on a list):

writeSize()
for all element e {
    elementCoder.write(e)
}
writeMagicNumber() // this one depends the size

The decoding is symmetric so I bypass it here.

Indeed all these writes (reads) are done on the same stream. Therefore it
assumes you read as much bytes than you write...which is a huge assumption
for a coder which should by contract assume it can read the stream...as a
stream (until -1).

The idea of the fix is to change this encoding to this kind of algorithm:

writeSize()
for all element e {
    writeElementByteCount(e)
    elementCoder.write(e)
}
writeMagicNumber() // still optionally

This way on the decode size you can wrap the stream by element to enforce
the limitation of the byte count.

Side note: this indeed enforce a limitation due to java byte limitation but
if you check coder code it is already here at the higher level so it is not
a big deal for now.

In terms of implementation it uses a LengthAwareCoder which delegates to
another coder the encoding and just adds the byte count before the actual
serialization. Not perfect but should be more than enough in terms of
support and perf for beam if you think real pipelines (we try to avoid
serializations or it is done on some well known points where this algo
should be enough...worse case it is not a huge overhead, mainly just some
memory overhead).


The PR is available at https://github.com/apache/beam/pull/4594. If you
check you will see I put it "WIP". The main reason is that it changes the
encoding format for containers (lists, iterable, ...) and therefore breaks
python/go/... tests and the standard_coders.yml definition. Some help on
that would be very welcomed.

Technical side note if you wonder: UnownedInputStream doesn't even allow to
mark the stream so there is no real fast way to read the stream as fast as
possible with standard buffering strategies and to support this automatic
IterableCoder wrapping which is implicit. In other words, if beam wants to
support any coder, including the ones not requiring to write the size of
the output - most of the codecs - then we need to change the way it works
to something like that which does it for the user which doesn't know its
coder got wrapped.

Hope it makes sense, if not, don't hesitate to ask questions.

Happy end of week-end.

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

Reply via email to