Re-adding +datapls-portability-t...@google.com <datapls-portability-t...@google.com> +datapls-unified-wor...@google.com <datapls-unified-wor...@google.com>
On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <rober...@google.com> wrote: > Thanks for bringing this to the list. More below. > > On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <k...@apache.org> wrote: > >> FWIW I deliberately limited the thread to not mix public and private >> lists, so people intending private replies do not accidentally send to >> dev@beam. >> >> I've left them on this time, to avoid contradicting your action, but I >> recommend removing them. >> >> Kenn >> >> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> Re-adding +datapls-portability-t...@google.com >>> <datapls-portability-t...@google.com> +datapls-unified-wor...@google.com >>> <datapls-unified-wor...@google.com> >>> >>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> That is correct Kenn. An important point would be that SomeOtherCoder >>>> would be given a seekable stream (instead of the forward only stream it >>>> gets right now) so it can either decode all the data or lazily decode parts >>>> as it needs to as in the case of an iterable coder when used to support >>>> large iterables coming out of a GroupByKey. >>>> >>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> Interesting! Having large iterables within rows would be great for the >>>>> interactions between SQL and the core SDK's schema/Row support, and we >>>>> weren't sure how that could work, exactly. >>>>> >>>>> My (very basic) understanding would be that >>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length >>>>> followed by the encoding of SomeOtherCoder. >>>>> >>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) >>>>> has an encoding where it has a length followed by some number of bytes and >>>>> if it ends with a special token (ignoring escaping issues) then you have >>>>> to >>>>> gather bytes from more messages in order to assemble a stream to send to >>>>> SomeOtherCoder? Have I got what you mean? So this is a different, yet >>>>> compatible, approach to sending over a special token that has to be looked >>>>> up separately via the state read API? >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> There is a discussion happening on a PR 7127[1] where Robert is >>>>>> working on providing the first implementation for supporting large >>>>>> iterables resulting from a GroupByKey. This is inline with the original >>>>>> proposal for remote references over the Fn Data & State API[2]. >>>>>> >>>>>> I had thought about this issue more since the original write up was >>>>>> done over a year ago and believe that we can simplify the implementation >>>>>> by >>>>>> migrating the length prefix coder to be able to embed a remote reference >>>>>> token at the end of the stream if the data is too large. This allows any >>>>>> coder which supports lazy decoding to return a view over a seekable >>>>>> stream >>>>>> instead of decoding all the data (regardless whether all the data was >>>>>> sent >>>>>> or there is a state token representing the remote reference). >>>>>> >>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the >>>>>> large iterable use case but also opens up the ability for types which >>>>>> don't >>>>>> need to be fully decoded to provide lazy views. Imagine our Beam rows >>>>>> using >>>>>> a format where only rows that are read are decoded while everything else >>>>>> is >>>>>> left in its encoded form. >>>>>> >>>>>> I also originally thought that this could also help solve an issue >>>>>> where large values[3] need to be chunked across multiple protobuf >>>>>> messages >>>>>> over the Data API which complicates the reading side decoding >>>>>> implementation since each SDK needs to provide an implementation that >>>>>> blocks and waits for the next chunk to come across for the same logical >>>>>> stream[4]. But there are issues with this because the runner may make a >>>>>> bad >>>>>> coder choice such as iterable<length_prefix<blob>> (instead >>>>>> of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys >>>>>> if >>>>>> there are many many values. >>>>>> >>>>> > Yes. I think this would need to be a separate coder than the length prefix > coder. > > Robert, would implementing the length prefix coder being backed by state + >>>>>> adding a lazy decoding method to the iterable coder be significantly more >>>>>> complicated then what you are proposing right now? >>>>>> >>>>> > Yes, chopping things up at arbitrary byte boundaries (rather than element > boundaries) tends to be significantly more subtle and complex (based on my > experience with the data plane API). It would also require new public APIs > for Coders. > After some further thought, I don't think we need to have a different API for coders, its just that they get a different implementation for the inputstream when decoding. So the logic would be: public T decode(InputStream is) { if (is instanceof SeekableInputStream) { return view((SeekableInputStream) is); } return decodeInternal(is); } This is why I went with the more restricted (but still by far most common, > and quite straightforward) case of supporting arbitrarily large iterables > (which can still occur at any level of nesting, e.g. inside rows), leaving > the general case as future work. > > >> What do others think about coders supporting a "lazy" decode mode in >>>>>> coders? >>>>>> >>>>>> 1: https://github.com/apache/beam/pull/7127 >>>>>> 2: >>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50 >>>>>> 3: >>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0 >>>>>> 4: >>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf >>>>>> >>>>>