Hi guys, I submitted a PR on coders to enhance 1. the user experience 2. the determinism and handling of coders.
1. the user experience is linked to what i sent some days ago: close handling of the streams from a coder code. Long story short I add a SkipCloseCoder which can decorate a coder and just wraps the stream (input or output) in flavors skipping close() calls. This avoids to do it by default (which had my preference if you read the related thread but not the one of everybody) but also makes the usage of a coder with this issue easy since the of() of the coder just wraps itself in this delagating coder. 2. this one is more nasty and mainly concerns IterableLikeCoders. These ones use this kind of algorithm (keep in mind they work on a list): writeSize() for all element e { elementCoder.write(e) } writeMagicNumber() // this one depends the size The decoding is symmetric so I bypass it here. Indeed all these writes (reads) are done on the same stream. Therefore it assumes you read as much bytes than you write...which is a huge assumption for a coder which should by contract assume it can read the stream...as a stream (until -1). The idea of the fix is to change this encoding to this kind of algorithm: writeSize() for all element e { writeElementByteCount(e) elementCoder.write(e) } writeMagicNumber() // still optionally This way on the decode size you can wrap the stream by element to enforce the limitation of the byte count. Side note: this indeed enforce a limitation due to java byte limitation but if you check coder code it is already here at the higher level so it is not a big deal for now. In terms of implementation it uses a LengthAwareCoder which delegates to another coder the encoding and just adds the byte count before the actual serialization. Not perfect but should be more than enough in terms of support and perf for beam if you think real pipelines (we try to avoid serializations or it is done on some well known points where this algo should be enough...worse case it is not a huge overhead, mainly just some memory overhead). The PR is available at https://github.com/apache/beam/pull/4594. If you check you will see I put it "WIP". The main reason is that it changes the encoding format for containers (lists, iterable, ...) and therefore breaks python/go/... tests and the standard_coders.yml definition. Some help on that would be very welcomed. Technical side note if you wonder: UnownedInputStream doesn't even allow to mark the stream so there is no real fast way to read the stream as fast as possible with standard buffering strategies and to support this automatic IterableCoder wrapping which is implicit. In other words, if beam wants to support any coder, including the ones not requiring to write the size of the output - most of the codecs - then we need to change the way it works to something like that which does it for the user which doesn't know its coder got wrapped. Hope it makes sense, if not, don't hesitate to ask questions. Happy end of week-end. Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance>