I think doing a change that would break pipeline update for every single
user of Flink and Dataflow needs to be postponed until a next major
version. Pipeline update is a very frequently used feature, especially by
the largest users. We've had those users get significantly upset even when
we accidentally broke update compatibility for some special cases of
individual transforms; breaking it intentionally and project-wide is too
extreme to be justified by the benefits of the current change.

That said, I think concerns about coder APIs are reasonable, and it is
unfortunate that we effectively can't make changes to them right now. It
would be great if in the next major version we were better prepared for
evolution of coders, e.g. by having coders support a version marker or
something like that, with an API for detecting the version of data on wire
and reading or writing data of an old version. Such a change (introducing
versioning) would also, of course, be incompatible and would need to be
postponed until a major version - but, at least, subsequent changes
wouldn't.

...And as I was typing this email, seems that this is what the thread
already came to!

On Sun, Feb 4, 2018 at 9:16 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> I like this idea of migration support at coder level. It would require to
> add a metadata in all outputs which would represent the version then coders
> can handle the logic properly depending the version - we can assume a coder
> dev upgrade the version when he breaks the representation I hope ;).
> With this: no runner impact at all :).
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-02-04 18:09 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> It would already break quite a number of users at this point.
>>
>> I think what we should be doing is moving forward on the snapshot/update
>> proposal. That proposal actually provides a way forward when coders change
>> (it proposes a way to map an old snapshot to one using the new coder, so
>> changes to coders in the future will be much easier to make. However much
>> of the implementation for this will likely be at the runner level, not the
>> SDK level.
>>
>> Reuven
>>
>> On Sun, Feb 4, 2018 at 9:04 AM, Romain Manni-Bucau <rmannibu...@gmail.com
>> > wrote:
>>
>>> I fully understand that, and this is one of the reason managing to solve
>>> these issues is very important and ASAP. My conclusion is that we must
>>> break it now to avoid to do it later when usage will be way more developped
>>> - I would be very happy to be wrong on that point - so I started this PR
>>> and this thread. We can postpone it but it would break later so for
>>> probably more users.
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>
>>> 2018-02-04 17:49 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> Unfortunately several runners (at least Flink and Dataflow) support
>>>> in-place update of streaming pipelines as a key feature, and changing coder
>>>> format breaks this. This is a very important feature of both runners, and
>>>> we should endeavor not to break them.
>>>>
>>>> In-place snapshot and update is also a top-level Beam proposal that was
>>>> received positively, though neither of those runners yet implement the
>>>> proposed interface.
>>>>
>>>> Reuven
>>>>
>>>> On Sun, Feb 4, 2018 at 8:44 AM, Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>> Sadly yes, and why the PR is actually WIP. As mentionned it modifies
>>>>> it and requires some updates in other languages and the 
>>>>> standard_coders.yml
>>>>> file (I didn't find how this file was generated).
>>>>> Since coders must be about volatile data I don't think it is a big
>>>>> deal to change it though.
>>>>>
>>>>>
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>
>>>>> 2018-02-04 17:34 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>
>>>>>> One question - does this change the actual byte encoding of elements?
>>>>>> We've tried hard not to do that so far for reasons of compatibility.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sun, Feb 4, 2018 at 6:44 AM, Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>>
>>>>>>> I submitted a PR on coders to enhance 1. the user experience 2. the
>>>>>>> determinism and handling of coders.
>>>>>>>
>>>>>>> 1. the user experience is linked to what i sent some days ago: close
>>>>>>> handling of the streams from a coder code. Long story short I add a
>>>>>>> SkipCloseCoder which can decorate a coder and just wraps the stream 
>>>>>>> (input
>>>>>>> or output) in flavors skipping close() calls. This avoids to do it by
>>>>>>> default (which had my preference if you read the related thread but not 
>>>>>>> the
>>>>>>> one of everybody) but also makes the usage of a coder with this issue 
>>>>>>> easy
>>>>>>> since the of() of the coder just wraps itself in this delagating coder.
>>>>>>>
>>>>>>> 2. this one is more nasty and mainly concerns IterableLikeCoders.
>>>>>>> These ones use this kind of algorithm (keep in mind they work on a 
>>>>>>> list):
>>>>>>>
>>>>>>> writeSize()
>>>>>>> for all element e {
>>>>>>>     elementCoder.write(e)
>>>>>>> }
>>>>>>> writeMagicNumber() // this one depends the size
>>>>>>>
>>>>>>> The decoding is symmetric so I bypass it here.
>>>>>>>
>>>>>>> Indeed all these writes (reads) are done on the same stream.
>>>>>>> Therefore it assumes you read as much bytes than you write...which is a
>>>>>>> huge assumption for a coder which should by contract assume it can read 
>>>>>>> the
>>>>>>> stream...as a stream (until -1).
>>>>>>>
>>>>>>> The idea of the fix is to change this encoding to this kind of
>>>>>>> algorithm:
>>>>>>>
>>>>>>> writeSize()
>>>>>>> for all element e {
>>>>>>>     writeElementByteCount(e)
>>>>>>>     elementCoder.write(e)
>>>>>>> }
>>>>>>> writeMagicNumber() // still optionally
>>>>>>>
>>>>>>> This way on the decode size you can wrap the stream by element to
>>>>>>> enforce the limitation of the byte count.
>>>>>>>
>>>>>>> Side note: this indeed enforce a limitation due to java byte
>>>>>>> limitation but if you check coder code it is already here at the higher
>>>>>>> level so it is not a big deal for now.
>>>>>>>
>>>>>>> In terms of implementation it uses a LengthAwareCoder which
>>>>>>> delegates to another coder the encoding and just adds the byte count 
>>>>>>> before
>>>>>>> the actual serialization. Not perfect but should be more than enough in
>>>>>>> terms of support and perf for beam if you think real pipelines (we try 
>>>>>>> to
>>>>>>> avoid serializations or it is done on some well known points where this
>>>>>>> algo should be enough...worse case it is not a huge overhead, mainly 
>>>>>>> just
>>>>>>> some memory overhead).
>>>>>>>
>>>>>>>
>>>>>>> The PR is available at https://github.com/apache/beam/pull/4594. If
>>>>>>> you check you will see I put it "WIP". The main reason is that it 
>>>>>>> changes
>>>>>>> the encoding format for containers (lists, iterable, ...) and therefore
>>>>>>> breaks python/go/... tests and the standard_coders.yml definition. Some
>>>>>>> help on that would be very welcomed.
>>>>>>>
>>>>>>> Technical side note if you wonder: UnownedInputStream doesn't even
>>>>>>> allow to mark the stream so there is no real fast way to read the 
>>>>>>> stream as
>>>>>>> fast as possible with standard buffering strategies and to support this
>>>>>>> automatic IterableCoder wrapping which is implicit. In other words, if 
>>>>>>> beam
>>>>>>> wants to support any coder, including the ones not requiring to write 
>>>>>>> the
>>>>>>> size of the output - most of the codecs - then we need to change the 
>>>>>>> way it
>>>>>>> works to something like that which does it for the user which doesn't 
>>>>>>> know
>>>>>>> its coder got wrapped.
>>>>>>>
>>>>>>> Hope it makes sense, if not, don't hesitate to ask questions.
>>>>>>>
>>>>>>> Happy end of week-end.
>>>>>>>
>>>>>>> Romain Manni-Bucau
>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to