On Tue, Nov 20, 2018 at 7:10 PM Lukasz Cwik <lc...@google.com> wrote:

> I'll perform the swap for a fraction because as I try to map more of the
> spaces to an arbitrary byte[] I naturally first map the space onto natural
> numbers before mapping to a byte[].
>
> Any preference between these options:
> A:
> // Represents a non-negative decimal number: unscaled_value * 10^(-scale)
> message Decimal {
>   // Represents the unscaled value as a big endian unlimited precision
> non-negative integer.
>   bytes unscaled_value = 1;
>   // Represents the scale
>   uint32 scale = 2;
> }
>
> B:
> // Textual representation of a decimal (i.e. "123.00")
> string decimal = 1;
>
> C:
> // Represents a non-negative decimal number: "integer"."fraction"
> message Decimal {
>   // Represents the integral part of the decimal in big endian as a big
> endian unlimited precision non-negative integer.
>   bytes integer = 1;
>   // Represents the fractional part of the decimal represented as a big
> endian unlimited precision non-negative integer.
>   bytes fraction = 2;
> }
>
> A is the most common and seems to be supported by Java (BigDecimal),
> Python (decimal module) and Go (via shopspring/decimal). B is a close
> second since many languages can convert it.
>

Any reason to not just use double? (Do we need arbitrary/fixed precision
for anything?)


> On Tue, Nov 20, 2018 at 3:09 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> I'm still trying to wrap my head around what is meant by backlog here, as
>> it's different than what I've seen in previous discussions.
>>
>> Generally, the backlog represented a measure of the known but undone part
>> of a restriction. This is useful for a runner to understand in some manner
>> what progress is being made and where remaining work lies, and this is
>> difficult to do if expressed as an opaque byte array, and more so if
>> backlog is local to a restriction rather than an arbitrary quantity that
>> can be compared (and aggregated) across restrictions. Even better if a
>> similar measure can be applied to arbitrary (e.g. completed) restrictions
>> for estimation of a mapping to the time domain. Does using a byte[] here
>> have advantage over using a(n often integral) floating point number?
>>
>
> I like the idea of using an arbitrary precision floating point number
> (like SQL decimal, Java BigDecimal, python decimal) since it solves several
> questions such as to how to aggregate values and most languages have a
> native representation for a decimal type. The issue is about providing a
> mapping for key range based sources such as Bigtable/HBase. Imagine your at
> key 000 and you advance to key 0000 for the restriction [0, 1), what
> fraction of work have you advanced?
>
> The only solution I can provide for the backlog is if I choose a maximum
> precision and clamp the length of the byte[] and then provide each possible
> byte string a number. For example I clamp the length to 3 and give each
> byte string a position:
> index: byte string
> 1: 0
> 2: 00
> 3: 000
> 4: 001
> 5: 01
> 6: 010
> 7: 011
> 8: 1
> 9: 10
> 10: 100
> 11: 101
> 12: 11
> 13: 110
> 14: 111
>
> Since each key is given a value including the "largest key" I can compute
> the distance between two keys.
>

Some other options I was able to come up with are:

(1) Say you represent your key in a keyspace of N characters. Map it to a
kyespace of N+1 characters by copying the string and then terminating each
with the "new" character. Map this to the reals, and every single key is
separated.

(2) Even easier, simply append a fixed non-zero character to the end of
every key before mapping to a fraction. All keys are now separable.

The larger the alphabet, the less skew this introduces.


> I have thought about increasing the precision as I find significantly
> larger keys but don't know how this will impact scaling decisions in
> runners.
>
>
>> I'm also a bit unclear on why it's desirable to pass this backlog back to
>> the SDF when trying to split restrictions. Here it seems much more natural
>> (for both the runner and SDK) to simply pass a floating point value in [0,
>> 1) for the proportion of work that should be split, rather than manipulate
>> the given backlog in to try to approximate this. (There's some ambiguity
>> here of whether multiple splits should be returned if less than half should
>> be retained.)
>>
>
> Returning the backlog using the same space as the SDF will prevent skew in
> what is returned since the SDF may make progress in the meantime. For
> example you have 100mb to process and you ask for 40% of the work and the
> SDK has processed 10mb in the meantime which means you'll get 40% of 90mb =
> 36mb back instead of 40mb.
>

I actually think this is an advantage. If I ask for 50% of your backlog, I
get 50% of the remaining work, regardless of what has been processed so
far, not some uneven distribution (or, worse, end up in a degenerate state
where the worker has processed too much to satisfy the request at all).

I also believe that the backlog should subdivide the space so that a
> request for 20mb from a backlog of 100mb should subdivide the space into 5
> segments.
>

Would a request for 40mb result in two 40mb chunks, keeping 20mb? I also
don't know what it means to get back multiple splits from a message-queue
like source like pubsub.

If we want to produce multiple splits, I would take a list of split points
rather than one. OTOH, we already have a method to split an restriction
into N restrictions (used for initial splitting) which probably suffices.


>
>
>> Having a polymorphically-interpreted bytes[] backlog seems to add a lot
>> of complexity that would need to be justified.
>>
>
> Each source needs to polymorphically interpret every generic
> representation such as an integral fraction onto their space. There is a
> natural mapping for remaining bytes in a file and also for number of
> messages on a queue but not as clean for key range based sources as shown
> above.
>
>
>> It seems there's consensus for the finalization protocol; perhaps that
>> can be checked in as a separate PR? (Also, the idea of having to have a
>> final key is not ideal, e.g. it means that for lexicographic range sources
>> the empty key has different meanings depending on whether it's a start or
>> end key, but I think we can provide a mark-as-done method in a future
>> compatible way if this becomes too burdensome.)
>>
>
> I'll pull out the finalization to another PR.
>
> I spoke with the Bigtable folks and they said that using "" as the start
> and end key is unambiguous since they don't allow the "" key to exist and
> also that all their requests are based upon a specific key or key ranges
> ["", "") is unambiguous. So being at "" and advancing to "" means that you
> advanced to the end.
>

True, not allowing "" to exist is a wart we should not assume, but does
allow one to better disambiguate.


> The gotchas about having a markAsDone() method is that:
> * The SDK harness code powering an SDF needs to be aware that this has
> happened
> * The SDK harness code may want to know the "position" that was last used
> and mark as done obfuscates that
>
> Neither are insurmountable but we don't have many SDF implementations to
> provide guidance as to what is terrible with the API so would rather be
> more restrictive now and fix these issues as we go on.
>

Yep, totally with you on this.


> On Tue, Nov 20, 2018 at 1:22 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I also addressed a bunch of PR comments which clarified the
>>> contract/expectations as described in my previous e-mail and the
>>> splitting/backlog reporting/bundle finalization docs.
>>>
>>> On Mon, Nov 19, 2018 at 3:19 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Nov 19, 2018 at 3:06 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Sorry for the late reply.
>>>>>
>>>>> On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <ieme...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Some late comments, and my pre excuses if some questions look silly,
>>>>>> but the last documents were a lot of info that I have not yet fully
>>>>>> digested.
>>>>>>
>>>>>> I have some questions about the ‘new’ Backlog concept following a
>>>>>> quick look at the PR
>>>>>> https://github.com/apache/beam/pull/6969/files
>>>>>>
>>>>>> 1. Is the Backlog a specific concept for each IO? Or in other words:
>>>>>> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am
>>>>>> assuming from what I could understand that the Backlog implementation
>>>>>> will be data store specific, is this the case? or it can be in some
>>>>>> case generalized (for example for Filesystems)?
>>>>>>
>>>>>
>>>>> The backlog is tied heavily to the restriction tracker implementation,
>>>>> any data store using the same restriction tracker will provide the same
>>>>> backlog computation. For example, if HBase/Bigtable use the
>>>>> ByteKeyRestrictionTracker then they will use the same backlog calculation.
>>>>> Note that an implementation could subclass a restriction tracker if the
>>>>> data store could provide additional information. For example, the default
>>>>> backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is
>>>>> distance(currentKey, lastKey) where distance is represented as byte array
>>>>> subtraction (which can be wildly inaccurrate as the density of data is not
>>>>> well reflected) but if HBase/Bigtable could provide the number of bytes
>>>>> from current key to last key, a better representation could be provided.
>>>>>
>>>>> Other common examples of backlogs would be:
>>>>> * files: backlog = length of file - current byte offset
>>>>> * message queues: backlog = number of outstanding messages
>>>>>
>>>>>
>>>>>>
>>>>>> 2. Since the backlog is a byte[] this means that it is up to the user
>>>>>> to give it a meaning depending on the situation, is this correct? Also
>>>>>> since splitRestriction has now the Backlog as an argument, what do we
>>>>>> expect the person that implements this method in a DoFn to do ideally
>>>>>> with it? Maybe a more concrete example of how things fit for
>>>>>> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for
>>>>>> the BundleFinalizer concept too).
>>>>>>
>>>>>
>>>>> Yes, the restriction tracker/restriction/SplittableDoFn must give the
>>>>> byte[] a meaning. This can have any meaning but we would like that the
>>>>> backlog byte[] representation to be lexicograhically comparable (when
>>>>> viewing the byte[] in big endian format and prefixes are smaller (e.g. 001
>>>>> is smaller then 0010) and preferably a linear representation. Note that 
>>>>> all
>>>>> restriction trackers of the same type should use the same "space" so that
>>>>> backlogs are comparable across multiple restriction tracker instances.
>>>>>
>>>>> The backlog when provided to splitRestriction should be used to
>>>>> subdivide the restriction into smaller restrictions where each would have
>>>>> the backlog if processed (except for potentially the last).
>>>>>
>>>>> A concrete example would be to represent the remaining bytes to
>>>>> process in a file as a 64 bit big endian integer, lets say that is 500MiB
>>>>> (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 01000000
>>>>> (note that the trailing zeros are optional and doesn't impact the
>>>>> calculation). The runner could notice that processing the restriction will
>>>>> take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the
>>>>> bits over by 4 and asks to split using backlog 00000000 00000000 00000000
>>>>> 00000000 00000001 11110100. The SDK is able to convert this request back
>>>>> into 32768000 bytes and returns 16 restrictions. Another example would be
>>>>> for a message queue where we have 10000 messages on the queue remaining so
>>>>> the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000
>>>>> 00100111 00010000 when represented as a 64 bit unsigned big endian 
>>>>> integer.
>>>>> The runner could ask the SDK to split using a 1/8th backlog of 00000000
>>>>> 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the
>>>>> SDK would break out into 8 restrictions, the first 7 responsible for
>>>>> reading 1250 messages and stopping while the last restriction would read
>>>>> 1250 messages and then continue to read anything else that has been
>>>>> enqueued.
>>>>>
>>>>> Bundle finalization is unrelated to backlogs but is needed since there
>>>>> is a class of data stores which need acknowledgement that says I have
>>>>> successfully received your data and am now responsible for it such as
>>>>> acking a message from a message queue.
>>>>>
>>>>
>>>> Note that this does bring up the question of whether SDKs should expose
>>>> coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist which
>>>> would cover a good number of scenarios described above. This coder doesn't
>>>> have to be understood by the runner nor does it have to be part of the
>>>> portability APIs (either Runner of Fn API). WDYT?
>>>>
>>>>
>>>>>
>>>>>> 3. By default all Restrictions are assumed to be unbounded but there
>>>>>> is this new Restrictions.IsBounded method, can’t this behavior be
>>>>>> inferred (adapted) from the DoFn UnboundedPerElement/Bounded
>>>>>> annotation or are these independent concepts?
>>>>>>
>>>>>
>>>>> UnboundedPerElement/BoundedPerElement tells us during pipeline
>>>>> construction time what type of PCollection we will be creating since we 
>>>>> may
>>>>> have a bounded PCollection goto an UnboundedPerElement DoFn and that will
>>>>> produce an unbounded PCollection and similarly we could have an unbounded
>>>>> PCollection goto a BoundedPerElement DoFn and that will produce an
>>>>> unbounded PCollection. Restrictions.IsBounded is used during pipeline
>>>>> execution to inform the runner whether a restriction being returned is
>>>>> bounded or not since unbounded restrictions can return bounded 
>>>>> restrictions
>>>>> during splitting. So in the above example using the message queue, the
>>>>> first 7 restrictions that only read 1250 messages would be marked with the
>>>>> Restrictions.IsBounded interface while the last one would not be. This
>>>>> could also be a method on restrictions such as "IsBounded isBounded()" on
>>>>> PCollections.
>>>>>
>>>>> Extra unrelated comment:
>>>>>> Since SDF is still @Experimental we should probably rename
>>>>>> OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker
>>>>>> suffix (I don’t know why they share the RangeTracker suffix for the
>>>>>> new trackers, WDYT?
>>>>>>
>>>>>
>>>>> Agree, will perform in a follow-up PR.
>>>>>
>>>>>
>>>>>> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> I think that not returning the users specific subclass should be
>>>>>> fine.
>>>>>> >> Does the removal of markDone imply that the consumer always knows a
>>>>>> >> "final" key to claim on any given restriction?
>>>>>> >
>>>>>> >
>>>>>> > Yes, each restriction needs to support claiming a "final" key that
>>>>>> would make the restriction "done". In the BigTable/HBase case it is the
>>>>>> empty key "", for files it can be a file offset beyond the end of the 
>>>>>> file.
>>>>>> Generally, restriction trackers written by SDF authors could also take an
>>>>>> instance of an object that they can compare instance equality against 
>>>>>> for a
>>>>>> final key. Alternatively we could allow restriction trackers to implement
>>>>>> markDone() but would need the SDK have knowledge of the method by having
>>>>>> the RestrictionTracker implement interface, extend abstract base class, 
>>>>>> or
>>>>>> reflectively found so that we would be able to wrap it to provide
>>>>>> synchronization guarantees. I had toyed with the idea of using something
>>>>>> like the ProxyInvocationHandler that backs PipelineOptions to be able to
>>>>>> provide a modified version of the users instance that had the appropriate
>>>>>> synchronization guarantees but couldn't get it to work.
>>>>>> >
>>>>>> >>
>>>>>> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>> >> >
>>>>>> >> > I have started to work on how to change the user facing API
>>>>>> within the Java SDK to support splitting/checkpointing[1], backlog
>>>>>> reporting[2] and bundle finalization[3].
>>>>>> >> >
>>>>>> >> > I have this PR[4] which contains minimal interface/type
>>>>>> definitions to convey how the API surface would change with these 4 
>>>>>> changes:
>>>>>> >> > 1) Exposes the ability for @SplitRestriction to take a backlog
>>>>>> suggestion on how to perform splitting and for how many restrictions 
>>>>>> should
>>>>>> be returned.
>>>>>> >> > 2) Adds the ability for RestrictionTrackers to report backlog
>>>>>> >> > 3) Updates @ProcessElement to be required to take a generic
>>>>>> RestrictionTracker instead of the users own restriction tracker type.
>>>>>> >> > 4) Adds the ability for
>>>>>> @StartBundle/@ProcessElement/@FinishBundle to register a callback that is
>>>>>> invoked after bundle finalization.
>>>>>> >> >
>>>>>> >> > The details are in the javadoc comments as to how I would expect
>>>>>> the contract to play out.
>>>>>> >> > Feel free to comment on the ML/PR around the contract and after
>>>>>> the feedback is received/digested/implemented, I would like to get the
>>>>>> changes submitted so that work can start  towards providing an
>>>>>> implementation in the Java SDK, Python SDK, and Go SDK and the shared
>>>>>> runner portability library.
>>>>>> >> >
>>>>>> >> > I would like to call out special attention to 3 since with this
>>>>>> change it will enable us to remove the synchronization requirement for
>>>>>> users as we will wrap the underlying restriction tracker allowing us to 
>>>>>> add
>>>>>> appropriate synchronization as needed and also to watch any calls that 
>>>>>> pass
>>>>>> through the object such as the claim calls. I also believe this prevents
>>>>>> people from writing RestrictionTrackers where the contract of tryClaim is
>>>>>> subverted since markDone is outside the purview of tryClaim as in
>>>>>> ByteKeyRangeTracker[5].
>>>>>> >> >
>>>>>> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles
>>>>>> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting
>>>>>> >> > 3: https://s.apache.org/beam-finalizing-bundles
>>>>>> >> > 4: https://github.com/apache/beam/pull/6969
>>>>>> >> > 5: https://github.com/apache/beam/pull/6949
>>>>>>
>>>>>

Reply via email to