There is a discussion happening on a PR 7127[1] where Robert is working on
providing the first implementation for supporting large iterables resulting
from a GroupByKey. This is inline with the original proposal for remote
references over the Fn Data & State API[2].

I had thought about this issue more since the original write up was done
over a year ago and believe that we can simplify the implementation by
migrating the length prefix coder to be able to embed a remote reference
token at the end of the stream if the data is too large. This allows any
coder which supports lazy decoding to return a view over a seekable stream
instead of decoding all the data (regardless whether all the data was sent
or there is a state token representing the remote reference).

Allowing any arbitrary coder to support lazy decoding helps solve the large
iterable use case but also opens up the ability for types which don't need
to be fully decoded to provide lazy views. Imagine our Beam rows using a
format where only rows that are read are decoded while everything else is
left in its encoded form.

I also originally thought that this could also help solve an issue where
large values[3] need to be chunked across multiple protobuf messages over
the Data API which complicates the reading side decoding implementation
since each SDK needs to provide an implementation that blocks and waits for
the next chunk to come across for the same logical stream[4]. But there are
issues with this because the runner may make a bad coder choice such
as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>)
which can lead to > 2gb of state keys if there are many many values.

Robert, would implementing the length prefix coder being backed by state +
adding a lazy decoding method to the iterable coder be significantly more
complicated then what you are proposing right now?

What do others think about coders supporting a "lazy" decode mode in coders?


Reply via email to