Re-adding +datapls-portability-t...@google.com
<datapls-portability-t...@google.com> +datapls-unified-wor...@google.com
<datapls-unified-wor...@google.com>

On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <rober...@google.com> wrote:

> Thanks for bringing this to the list. More below.
>
> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> FWIW I deliberately limited the thread to not mix public and private
>> lists, so people intending private replies do not accidentally send to
>> dev@beam.
>>
>> I've left them on this time, to avoid contradicting your action, but I
>> recommend removing them.
>>
>> Kenn
>>
>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Re-adding +datapls-portability-t...@google.com
>>> <datapls-portability-t...@google.com> +datapls-unified-wor...@google.com
>>> <datapls-unified-wor...@google.com>
>>>
>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> That is correct Kenn. An important point would be that SomeOtherCoder
>>>> would be given a seekable stream (instead of the forward only stream it
>>>> gets right now) so it can either decode all the data or lazily decode parts
>>>> as it needs to as in the case of an iterable coder when used to support
>>>> large iterables coming out of a GroupByKey.
>>>>
>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Interesting! Having large iterables within rows would be great for the
>>>>> interactions between SQL and the core SDK's schema/Row support, and we
>>>>> weren't sure how that could work, exactly.
>>>>>
>>>>> My (very basic) understanding would be that
>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
>>>>> followed by the encoding of SomeOtherCoder.
>>>>>
>>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder)
>>>>> has an encoding where it has a length followed by some number of bytes and
>>>>> if it ends with a special token (ignoring escaping issues) then you have 
>>>>> to
>>>>> gather bytes from more messages in order to assemble a stream to send to
>>>>> SomeOtherCoder? Have I got what you mean? So this is a different, yet
>>>>> compatible, approach to sending over a special token that has to be looked
>>>>> up separately via the state read API?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> There is a discussion happening on a PR 7127[1] where Robert is
>>>>>> working on providing the first implementation for supporting large
>>>>>> iterables resulting from a GroupByKey. This is inline with the original
>>>>>> proposal for remote references over the Fn Data & State API[2].
>>>>>>
>>>>>> I had thought about this issue more since the original write up was
>>>>>> done over a year ago and believe that we can simplify the implementation 
>>>>>> by
>>>>>> migrating the length prefix coder to be able to embed a remote reference
>>>>>> token at the end of the stream if the data is too large. This allows any
>>>>>> coder which supports lazy decoding to return a view over a seekable 
>>>>>> stream
>>>>>> instead of decoding all the data (regardless whether all the data was 
>>>>>> sent
>>>>>> or there is a state token representing the remote reference).
>>>>>>
>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the
>>>>>> large iterable use case but also opens up the ability for types which 
>>>>>> don't
>>>>>> need to be fully decoded to provide lazy views. Imagine our Beam rows 
>>>>>> using
>>>>>> a format where only rows that are read are decoded while everything else 
>>>>>> is
>>>>>> left in its encoded form.
>>>>>>
>>>>>> I also originally thought that this could also help solve an issue
>>>>>> where large values[3] need to be chunked across multiple protobuf 
>>>>>> messages
>>>>>> over the Data API which complicates the reading side decoding
>>>>>> implementation since each SDK needs to provide an implementation that
>>>>>> blocks and waits for the next chunk to come across for the same logical
>>>>>> stream[4]. But there are issues with this because the runner may make a 
>>>>>> bad
>>>>>> coder choice such as iterable<length_prefix<blob>> (instead
>>>>>> of length_prefix<iterable<blob>>) which can lead to > 2gb of state keys 
>>>>>> if
>>>>>> there are many many values.
>>>>>>
>>>>>
> Yes. I think this would need to be a separate coder than the length prefix
> coder.
>

>
Robert, would implementing the length prefix coder being backed by state +
>>>>>> adding a lazy decoding method to the iterable coder be significantly more
>>>>>> complicated then what you are proposing right now?
>>>>>>
>>>>>
> Yes, chopping things up at arbitrary byte boundaries (rather than element
> boundaries) tends to be significantly more subtle and complex (based on my
> experience with the data plane API). It would also require new public APIs
> for Coders.
>

After some further thought, I don't think we need to have a different API
for coders, its just that they get a different implementation for the
inputstream when decoding. So the logic would be:
public T decode(InputStream is) {
  if (is instanceof SeekableInputStream) {
    return view((SeekableInputStream) is);
  }
  return decodeInternal(is);
}

This is why I went with the more restricted (but still by far most common,
> and quite straightforward) case of supporting arbitrarily large iterables
> (which can still occur at any level of nesting, e.g. inside rows), leaving
> the general case as future work.
>

>
>> What do others think about coders supporting a "lazy" decode mode in
>>>>>> coders?
>>>>>>
>>>>>> 1: https://github.com/apache/beam/pull/7127
>>>>>> 2:
>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>>>>> 3:
>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>>>>> 4:
>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>>>>>>
>>>>>

Reply via email to