I don't believe we would need to change any other coders since
SeekableInputStream wouldn't change how a regular InputStream would work so
coders that don't care about the implementation would still use it as a
forward only input stream. Coders that care about seeking would use the new
functionality.

For the encoding portion, the state backed length prefix coder would send
the small snippet of data that it received plus the state key without
invoking the component coder to encode the value. The downstream receiving
party would need to lookup the remote reference to get all the data. All
other coders would not be lazy and would have to encode the entire lazy
view, this could be done by optimized by copying the SeekableInputStream to
the OutputStream. Note that the length prefix coder is never used with IOs
and hence those IOs could be given a type like Iterable<Foo> which is lazy,
but the encoding for that wouldn't be lazy and would output all the data
from the SeekableInputStream.


On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <rober...@google.com> wrote:

> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote:
> >
> > Re-adding +datapls-portability-t...@google.com +
> datapls-unified-wor...@google.com
> >
> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> Thanks for bringing this to the list. More below.
> >>
> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>
> >>> FWIW I deliberately limited the thread to not mix public and private
> lists, so people intending private replies do not accidentally send to
> dev@beam.
> >>>
> >>> I've left them on this time, to avoid contradicting your action, but I
> recommend removing them.
> >>>
> >>> Kenn
> >>>
> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
> >>>>
> >>>> Re-adding +datapls-portability-t...@google.com +
> datapls-unified-wor...@google.com
> >>>>
> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >>>>>
> >>>>> That is correct Kenn. An important point would be that
> SomeOtherCoder would be given a seekable stream (instead of the forward
> only stream it gets right now) so it can either decode all the data or
> lazily decode parts as it needs to as in the case of an iterable coder when
> used to support large iterables coming out of a GroupByKey.
> >>>>>
> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>>>
> >>>>>> Interesting! Having large iterables within rows would be great for
> the interactions between SQL and the core SDK's schema/Row support, and we
> weren't sure how that could work, exactly.
> >>>>>>
> >>>>>> My (very basic) understanding would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
> followed by the encoding of SomeOtherCoder.
> >>>>>>
> >>>>>> So the new proposal would be that
> LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a length
> followed by some number of bytes and if it ends with a special token
> (ignoring escaping issues) then you have to gather bytes from more messages
> in order to assemble a stream to send to SomeOtherCoder? Have I got what
> you mean? So this is a different, yet compatible, approach to sending over
> a special token that has to be looked up separately via the state read API?
> >>>>>>
> >>>>>> Kenn
> >>>>>>
> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is
> working on providing the first implementation for supporting large
> iterables resulting from a GroupByKey. This is inline with the original
> proposal for remote references over the Fn Data & State API[2].
> >>>>>>>
> >>>>>>> I had thought about this issue more since the original write up
> was done over a year ago and believe that we can simplify the
> implementation by migrating the length prefix coder to be able to embed a
> remote reference token at the end of the stream if the data is too large.
> This allows any coder which supports lazy decoding to return a view over a
> seekable stream instead of decoding all the data (regardless whether all
> the data was sent or there is a state token representing the remote
> reference).
> >>>>>>>
> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve
> the large iterable use case but also opens up the ability for types which
> don't need to be fully decoded to provide lazy views. Imagine our Beam rows
> using a format where only rows that are read are decoded while everything
> else is left in its encoded form.
> >>>>>>>
> >>>>>>> I also originally thought that this could also help solve an issue
> where large values[3] need to be chunked across multiple protobuf messages
> over the Data API which complicates the reading side decoding
> implementation since each SDK needs to provide an implementation that
> blocks and waits for the next chunk to come across for the same logical
> stream[4]. But there are issues with this because the runner may make a bad
> coder choice such as iterable<length_prefix<blob>> (instead of
> length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if
> there are many many values.
> >>
> >>
> >> Yes. I think this would need to be a separate coder than the length
> prefix coder.
> >>
> >>
> >>>>>>>
> >>>>>>> Robert, would implementing the length prefix coder being backed by
> state + adding a lazy decoding method to the iterable coder be
> significantly more complicated then what you are proposing right now?
> >>
> >>
> >> Yes, chopping things up at arbitrary byte boundaries (rather than
> element boundaries) tends to be significantly more subtle and complex
> (based on my experience with the data plane API). It would also require new
> public APIs for Coders.
> >
> >
> > After some further thought, I don't think we need to have a different
> API for coders, its just that they get a different implementation for the
> inputstream when decoding. So the logic would be:
> > public T decode(InputStream is) {
> >   if (is instanceof SeekableInputStream) {
> >     return view((SeekableInputStream) is);
> >   }
> >   return decodeInternal(is);
> > }
>
> SeekableInputStream is a new API. If we went this route of re-using
> decode, it'd be an easy bug to accidentally pass a SeekableInputStream
> to component coders which wouldn't do the right thing. (Perhaps all
> coders would have to be modified?) And encoding is less obvious (e.g.
> a subclass of OutputStream that takes a callback for the rest of the
> bytes? As chosen by the caller or the callee?).
>
> >> This is why I went with the more restricted (but still by far most
> common, and quite straightforward) case of supporting arbitrarily large
> iterables (which can still occur at any level of nesting, e.g. inside
> rows), leaving the general case as future work.
> >>
> >>
> >>>>>>>
> >>>>>>> What do others think about coders supporting a "lazy" decode mode
> in coders?
> >>>>>>>
> >>>>>>> 1: https://github.com/apache/beam/pull/7127
> >>>>>>> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> >>>>>>> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
> >>>>>>> 4:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
> >
> > --
> > You received this message because you are subscribed to the Google
> Groups "DataPLS Unified Worker" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> an email to datapls-unified-worker+unsubscr...@google.com.
> > To post to this group, send email to datapls-unified-wor...@google.com.
> > To view this discussion on the web visit
> https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com
> .
>

Reply via email to