I also addressed a bunch of PR comments which clarified the
contract/expectations as described in my previous e-mail and the
splitting/backlog reporting/bundle finalization docs.

On Mon, Nov 19, 2018 at 3:19 PM Lukasz Cwik <[email protected]> wrote:

>
>
> On Mon, Nov 19, 2018 at 3:06 PM Lukasz Cwik <[email protected]> wrote:
>
>> Sorry for the late reply.
>>
>> On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <[email protected]> wrote:
>>
>>> Some late comments, and my pre excuses if some questions look silly,
>>> but the last documents were a lot of info that I have not yet fully
>>> digested.
>>>
>>> I have some questions about the ‘new’ Backlog concept following a
>>> quick look at the PR
>>> https://github.com/apache/beam/pull/6969/files
>>>
>>> 1. Is the Backlog a specific concept for each IO? Or in other words:
>>> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am
>>> assuming from what I could understand that the Backlog implementation
>>> will be data store specific, is this the case? or it can be in some
>>> case generalized (for example for Filesystems)?
>>>
>>
>> The backlog is tied heavily to the restriction tracker implementation,
>> any data store using the same restriction tracker will provide the same
>> backlog computation. For example, if HBase/Bigtable use the
>> ByteKeyRestrictionTracker then they will use the same backlog calculation.
>> Note that an implementation could subclass a restriction tracker if the
>> data store could provide additional information. For example, the default
>> backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is
>> distance(currentKey, lastKey) where distance is represented as byte array
>> subtraction (which can be wildly inaccurrate as the density of data is not
>> well reflected) but if HBase/Bigtable could provide the number of bytes
>> from current key to last key, a better representation could be provided.
>>
>> Other common examples of backlogs would be:
>> * files: backlog = length of file - current byte offset
>> * message queues: backlog = number of outstanding messages
>>
>>
>>>
>>> 2. Since the backlog is a byte[] this means that it is up to the user
>>> to give it a meaning depending on the situation, is this correct? Also
>>> since splitRestriction has now the Backlog as an argument, what do we
>>> expect the person that implements this method in a DoFn to do ideally
>>> with it? Maybe a more concrete example of how things fit for
>>> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for
>>> the BundleFinalizer concept too).
>>>
>>
>> Yes, the restriction tracker/restriction/SplittableDoFn must give the
>> byte[] a meaning. This can have any meaning but we would like that the
>> backlog byte[] representation to be lexicograhically comparable (when
>> viewing the byte[] in big endian format and prefixes are smaller (e.g. 001
>> is smaller then 0010) and preferably a linear representation. Note that all
>> restriction trackers of the same type should use the same "space" so that
>> backlogs are comparable across multiple restriction tracker instances.
>>
>> The backlog when provided to splitRestriction should be used to subdivide
>> the restriction into smaller restrictions where each would have the backlog
>> if processed (except for potentially the last).
>>
>> A concrete example would be to represent the remaining bytes to process
>> in a file as a 64 bit big endian integer, lets say that is 500MiB
>> (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 01000000
>> (note that the trailing zeros are optional and doesn't impact the
>> calculation). The runner could notice that processing the restriction will
>> take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the
>> bits over by 4 and asks to split using backlog 00000000 00000000 00000000
>> 00000000 00000001 11110100. The SDK is able to convert this request back
>> into 32768000 bytes and returns 16 restrictions. Another example would be
>> for a message queue where we have 10000 messages on the queue remaining so
>> the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000
>> 00100111 00010000 when represented as a 64 bit unsigned big endian integer.
>> The runner could ask the SDK to split using a 1/8th backlog of 00000000
>> 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the
>> SDK would break out into 8 restrictions, the first 7 responsible for
>> reading 1250 messages and stopping while the last restriction would read
>> 1250 messages and then continue to read anything else that has been
>> enqueued.
>>
>> Bundle finalization is unrelated to backlogs but is needed since there is
>> a class of data stores which need acknowledgement that says I have
>> successfully received your data and am now responsible for it such as
>> acking a message from a message queue.
>>
>
> Note that this does bring up the question of whether SDKs should expose
> coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist which
> would cover a good number of scenarios described above. This coder doesn't
> have to be understood by the runner nor does it have to be part of the
> portability APIs (either Runner of Fn API). WDYT?
>
>
>>
>>> 3. By default all Restrictions are assumed to be unbounded but there
>>> is this new Restrictions.IsBounded method, can’t this behavior be
>>> inferred (adapted) from the DoFn UnboundedPerElement/Bounded
>>> annotation or are these independent concepts?
>>>
>>
>> UnboundedPerElement/BoundedPerElement tells us during pipeline
>> construction time what type of PCollection we will be creating since we may
>> have a bounded PCollection goto an UnboundedPerElement DoFn and that will
>> produce an unbounded PCollection and similarly we could have an unbounded
>> PCollection goto a BoundedPerElement DoFn and that will produce an
>> unbounded PCollection. Restrictions.IsBounded is used during pipeline
>> execution to inform the runner whether a restriction being returned is
>> bounded or not since unbounded restrictions can return bounded restrictions
>> during splitting. So in the above example using the message queue, the
>> first 7 restrictions that only read 1250 messages would be marked with the
>> Restrictions.IsBounded interface while the last one would not be. This
>> could also be a method on restrictions such as "IsBounded isBounded()" on
>> PCollections.
>>
>> Extra unrelated comment:
>>> Since SDF is still @Experimental we should probably rename
>>> OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker
>>> suffix (I don’t know why they share the RangeTracker suffix for the
>>> new trackers, WDYT?
>>>
>>
>> Agree, will perform in a follow-up PR.
>>
>>
>>> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <[email protected]> wrote:
>>> >
>>> >
>>> >
>>> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>> >>
>>> >> I think that not returning the users specific subclass should be fine.
>>> >> Does the removal of markDone imply that the consumer always knows a
>>> >> "final" key to claim on any given restriction?
>>> >
>>> >
>>> > Yes, each restriction needs to support claiming a "final" key that
>>> would make the restriction "done". In the BigTable/HBase case it is the
>>> empty key "", for files it can be a file offset beyond the end of the file.
>>> Generally, restriction trackers written by SDF authors could also take an
>>> instance of an object that they can compare instance equality against for a
>>> final key. Alternatively we could allow restriction trackers to implement
>>> markDone() but would need the SDK have knowledge of the method by having
>>> the RestrictionTracker implement interface, extend abstract base class, or
>>> reflectively found so that we would be able to wrap it to provide
>>> synchronization guarantees. I had toyed with the idea of using something
>>> like the ProxyInvocationHandler that backs PipelineOptions to be able to
>>> provide a modified version of the users instance that had the appropriate
>>> synchronization guarantees but couldn't get it to work.
>>> >
>>> >>
>>> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <[email protected]> wrote:
>>> >> >
>>> >> > I have started to work on how to change the user facing API within
>>> the Java SDK to support splitting/checkpointing[1], backlog reporting[2]
>>> and bundle finalization[3].
>>> >> >
>>> >> > I have this PR[4] which contains minimal interface/type definitions
>>> to convey how the API surface would change with these 4 changes:
>>> >> > 1) Exposes the ability for @SplitRestriction to take a backlog
>>> suggestion on how to perform splitting and for how many restrictions should
>>> be returned.
>>> >> > 2) Adds the ability for RestrictionTrackers to report backlog
>>> >> > 3) Updates @ProcessElement to be required to take a generic
>>> RestrictionTracker instead of the users own restriction tracker type.
>>> >> > 4) Adds the ability for @StartBundle/@ProcessElement/@FinishBundle
>>> to register a callback that is invoked after bundle finalization.
>>> >> >
>>> >> > The details are in the javadoc comments as to how I would expect
>>> the contract to play out.
>>> >> > Feel free to comment on the ML/PR around the contract and after the
>>> feedback is received/digested/implemented, I would like to get the changes
>>> submitted so that work can start  towards providing an implementation in
>>> the Java SDK, Python SDK, and Go SDK and the shared runner portability
>>> library.
>>> >> >
>>> >> > I would like to call out special attention to 3 since with this
>>> change it will enable us to remove the synchronization requirement for
>>> users as we will wrap the underlying restriction tracker allowing us to add
>>> appropriate synchronization as needed and also to watch any calls that pass
>>> through the object such as the claim calls. I also believe this prevents
>>> people from writing RestrictionTrackers where the contract of tryClaim is
>>> subverted since markDone is outside the purview of tryClaim as in
>>> ByteKeyRangeTracker[5].
>>> >> >
>>> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles
>>> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting
>>> >> > 3: https://s.apache.org/beam-finalizing-bundles
>>> >> > 4: https://github.com/apache/beam/pull/6969
>>> >> > 5: https://github.com/apache/beam/pull/6949
>>>
>>

Reply via email to