Sorry for the late reply.

On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <ieme...@gmail.com> wrote:

> Some late comments, and my pre excuses if some questions look silly,
> but the last documents were a lot of info that I have not yet fully
> digested.
>
> I have some questions about the ‘new’ Backlog concept following a
> quick look at the PR
> https://github.com/apache/beam/pull/6969/files
>
> 1. Is the Backlog a specific concept for each IO? Or in other words:
> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am
> assuming from what I could understand that the Backlog implementation
> will be data store specific, is this the case? or it can be in some
> case generalized (for example for Filesystems)?
>

The backlog is tied heavily to the restriction tracker implementation, any
data store using the same restriction tracker will provide the same backlog
computation. For example, if HBase/Bigtable use the
ByteKeyRestrictionTracker then they will use the same backlog calculation.
Note that an implementation could subclass a restriction tracker if the
data store could provide additional information. For example, the default
backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is
distance(currentKey, lastKey) where distance is represented as byte array
subtraction (which can be wildly inaccurrate as the density of data is not
well reflected) but if HBase/Bigtable could provide the number of bytes
from current key to last key, a better representation could be provided.

Other common examples of backlogs would be:
* files: backlog = length of file - current byte offset
* message queues: backlog = number of outstanding messages


>
> 2. Since the backlog is a byte[] this means that it is up to the user
> to give it a meaning depending on the situation, is this correct? Also
> since splitRestriction has now the Backlog as an argument, what do we
> expect the person that implements this method in a DoFn to do ideally
> with it? Maybe a more concrete example of how things fit for
> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for
> the BundleFinalizer concept too).
>

Yes, the restriction tracker/restriction/SplittableDoFn must give the
byte[] a meaning. This can have any meaning but we would like that the
backlog byte[] representation to be lexicograhically comparable (when
viewing the byte[] in big endian format and prefixes are smaller (e.g. 001
is smaller then 0010) and preferably a linear representation. Note that all
restriction trackers of the same type should use the same "space" so that
backlogs are comparable across multiple restriction tracker instances.

The backlog when provided to splitRestriction should be used to subdivide
the restriction into smaller restrictions where each would have the backlog
if processed (except for potentially the last).

A concrete example would be to represent the remaining bytes to process in
a file as a 64 bit big endian integer, lets say that is 500MiB (524288000
bytes) or 00000000 00000000 00000000 00000000 00011111 01000000 (note that
the trailing zeros are optional and doesn't impact the calculation). The
runner could notice that processing the restriction will take 10 hrs, so it
asks the SDF to split at 1/16 segments by shifting the bits over by 4 and
asks to split using backlog 00000000 00000000 00000000 00000000 00000001
11110100. The SDK is able to convert this request back into 32768000 bytes
and returns 16 restrictions. Another example would be for a message queue
where we have 10000 messages on the queue remaining so the backlog would
be 00000000 00000000 00000000 00000000 00000000 00000000 00100111 00010000
when represented as a 64 bit unsigned big endian integer. The runner could
ask the SDK to split using a 1/8th backlog of 00000000 00000000 00000000
00000000 00000000 00000000 00000100 11100010 which the SDK would break out
into 8 restrictions, the first 7 responsible for reading 1250 messages and
stopping while the last restriction would read 1250 messages and then
continue to read anything else that has been enqueued.

Bundle finalization is unrelated to backlogs but is needed since there is a
class of data stores which need acknowledgement that says I have
successfully received your data and am now responsible for it such as
acking a message from a message queue.


>
> 3. By default all Restrictions are assumed to be unbounded but there
> is this new Restrictions.IsBounded method, can’t this behavior be
> inferred (adapted) from the DoFn UnboundedPerElement/Bounded
> annotation or are these independent concepts?
>

UnboundedPerElement/BoundedPerElement tells us during pipeline construction
time what type of PCollection we will be creating since we may have a
bounded PCollection goto an UnboundedPerElement DoFn and that will produce
an unbounded PCollection and similarly we could have an unbounded
PCollection goto a BoundedPerElement DoFn and that will produce an
unbounded PCollection. Restrictions.IsBounded is used during pipeline
execution to inform the runner whether a restriction being returned is
bounded or not since unbounded restrictions can return bounded restrictions
during splitting. So in the above example using the message queue, the
first 7 restrictions that only read 1250 messages would be marked with the
Restrictions.IsBounded interface while the last one would not be. This
could also be a method on restrictions such as "IsBounded isBounded()" on
PCollections.

Extra unrelated comment:
> Since SDF is still @Experimental we should probably rename
> OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker
> suffix (I don’t know why they share the RangeTracker suffix for the
> new trackers, WDYT?
>

Agree, will perform in a follow-up PR.


> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lc...@google.com> wrote:
> >
> >
> >
> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> I think that not returning the users specific subclass should be fine.
> >> Does the removal of markDone imply that the consumer always knows a
> >> "final" key to claim on any given restriction?
> >
> >
> > Yes, each restriction needs to support claiming a "final" key that would
> make the restriction "done". In the BigTable/HBase case it is the empty key
> "", for files it can be a file offset beyond the end of the file.
> Generally, restriction trackers written by SDF authors could also take an
> instance of an object that they can compare instance equality against for a
> final key. Alternatively we could allow restriction trackers to implement
> markDone() but would need the SDK have knowledge of the method by having
> the RestrictionTracker implement interface, extend abstract base class, or
> reflectively found so that we would be able to wrap it to provide
> synchronization guarantees. I had toyed with the idea of using something
> like the ProxyInvocationHandler that backs PipelineOptions to be able to
> provide a modified version of the users instance that had the appropriate
> synchronization guarantees but couldn't get it to work.
> >
> >>
> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lc...@google.com> wrote:
> >> >
> >> > I have started to work on how to change the user facing API within
> the Java SDK to support splitting/checkpointing[1], backlog reporting[2]
> and bundle finalization[3].
> >> >
> >> > I have this PR[4] which contains minimal interface/type definitions
> to convey how the API surface would change with these 4 changes:
> >> > 1) Exposes the ability for @SplitRestriction to take a backlog
> suggestion on how to perform splitting and for how many restrictions should
> be returned.
> >> > 2) Adds the ability for RestrictionTrackers to report backlog
> >> > 3) Updates @ProcessElement to be required to take a generic
> RestrictionTracker instead of the users own restriction tracker type.
> >> > 4) Adds the ability for @StartBundle/@ProcessElement/@FinishBundle to
> register a callback that is invoked after bundle finalization.
> >> >
> >> > The details are in the javadoc comments as to how I would expect the
> contract to play out.
> >> > Feel free to comment on the ML/PR around the contract and after the
> feedback is received/digested/implemented, I would like to get the changes
> submitted so that work can start  towards providing an implementation in
> the Java SDK, Python SDK, and Go SDK and the shared runner portability
> library.
> >> >
> >> > I would like to call out special attention to 3 since with this
> change it will enable us to remove the synchronization requirement for
> users as we will wrap the underlying restriction tracker allowing us to add
> appropriate synchronization as needed and also to watch any calls that pass
> through the object such as the claim calls. I also believe this prevents
> people from writing RestrictionTrackers where the contract of tryClaim is
> subverted since markDone is outside the purview of tryClaim as in
> ByteKeyRangeTracker[5].
> >> >
> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles
> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting
> >> > 3: https://s.apache.org/beam-finalizing-bundles
> >> > 4: https://github.com/apache/beam/pull/6969
> >> > 5: https://github.com/apache/beam/pull/6949
>

Reply via email to