On Tue, Nov 20, 2018 at 7:10 PM Lukasz Cwik <lc...@google.com> wrote:
> I'll perform the swap for a fraction because as I try to map more of the > spaces to an arbitrary byte[] I naturally first map the space onto natural > numbers before mapping to a byte[]. > > Any preference between these options: > A: > // Represents a non-negative decimal number: unscaled_value * 10^(-scale) > message Decimal { > // Represents the unscaled value as a big endian unlimited precision > non-negative integer. > bytes unscaled_value = 1; > // Represents the scale > uint32 scale = 2; > } > > B: > // Textual representation of a decimal (i.e. "123.00") > string decimal = 1; > > C: > // Represents a non-negative decimal number: "integer"."fraction" > message Decimal { > // Represents the integral part of the decimal in big endian as a big > endian unlimited precision non-negative integer. > bytes integer = 1; > // Represents the fractional part of the decimal represented as a big > endian unlimited precision non-negative integer. > bytes fraction = 2; > } > > A is the most common and seems to be supported by Java (BigDecimal), > Python (decimal module) and Go (via shopspring/decimal). B is a close > second since many languages can convert it. > Any reason to not just use double? (Do we need arbitrary/fixed precision for anything?) > On Tue, Nov 20, 2018 at 3:09 AM Robert Bradshaw <rober...@google.com> > wrote: > >> I'm still trying to wrap my head around what is meant by backlog here, as >> it's different than what I've seen in previous discussions. >> >> Generally, the backlog represented a measure of the known but undone part >> of a restriction. This is useful for a runner to understand in some manner >> what progress is being made and where remaining work lies, and this is >> difficult to do if expressed as an opaque byte array, and more so if >> backlog is local to a restriction rather than an arbitrary quantity that >> can be compared (and aggregated) across restrictions. Even better if a >> similar measure can be applied to arbitrary (e.g. completed) restrictions >> for estimation of a mapping to the time domain. Does using a byte[] here >> have advantage over using a(n often integral) floating point number? >> > > I like the idea of using an arbitrary precision floating point number > (like SQL decimal, Java BigDecimal, python decimal) since it solves several > questions such as to how to aggregate values and most languages have a > native representation for a decimal type. The issue is about providing a > mapping for key range based sources such as Bigtable/HBase. Imagine your at > key 000 and you advance to key 0000 for the restriction [0, 1), what > fraction of work have you advanced? > > The only solution I can provide for the backlog is if I choose a maximum > precision and clamp the length of the byte[] and then provide each possible > byte string a number. For example I clamp the length to 3 and give each > byte string a position: > index: byte string > 1: 0 > 2: 00 > 3: 000 > 4: 001 > 5: 01 > 6: 010 > 7: 011 > 8: 1 > 9: 10 > 10: 100 > 11: 101 > 12: 11 > 13: 110 > 14: 111 > > Since each key is given a value including the "largest key" I can compute > the distance between two keys. > Some other options I was able to come up with are: (1) Say you represent your key in a keyspace of N characters. Map it to a kyespace of N+1 characters by copying the string and then terminating each with the "new" character. Map this to the reals, and every single key is separated. (2) Even easier, simply append a fixed non-zero character to the end of every key before mapping to a fraction. All keys are now separable. The larger the alphabet, the less skew this introduces. > I have thought about increasing the precision as I find significantly > larger keys but don't know how this will impact scaling decisions in > runners. > > >> I'm also a bit unclear on why it's desirable to pass this backlog back to >> the SDF when trying to split restrictions. Here it seems much more natural >> (for both the runner and SDK) to simply pass a floating point value in [0, >> 1) for the proportion of work that should be split, rather than manipulate >> the given backlog in to try to approximate this. (There's some ambiguity >> here of whether multiple splits should be returned if less than half should >> be retained.) >> > > Returning the backlog using the same space as the SDF will prevent skew in > what is returned since the SDF may make progress in the meantime. For > example you have 100mb to process and you ask for 40% of the work and the > SDK has processed 10mb in the meantime which means you'll get 40% of 90mb = > 36mb back instead of 40mb. > I actually think this is an advantage. If I ask for 50% of your backlog, I get 50% of the remaining work, regardless of what has been processed so far, not some uneven distribution (or, worse, end up in a degenerate state where the worker has processed too much to satisfy the request at all). I also believe that the backlog should subdivide the space so that a > request for 20mb from a backlog of 100mb should subdivide the space into 5 > segments. > Would a request for 40mb result in two 40mb chunks, keeping 20mb? I also don't know what it means to get back multiple splits from a message-queue like source like pubsub. If we want to produce multiple splits, I would take a list of split points rather than one. OTOH, we already have a method to split an restriction into N restrictions (used for initial splitting) which probably suffices. > > >> Having a polymorphically-interpreted bytes[] backlog seems to add a lot >> of complexity that would need to be justified. >> > > Each source needs to polymorphically interpret every generic > representation such as an integral fraction onto their space. There is a > natural mapping for remaining bytes in a file and also for number of > messages on a queue but not as clean for key range based sources as shown > above. > > >> It seems there's consensus for the finalization protocol; perhaps that >> can be checked in as a separate PR? (Also, the idea of having to have a >> final key is not ideal, e.g. it means that for lexicographic range sources >> the empty key has different meanings depending on whether it's a start or >> end key, but I think we can provide a mark-as-done method in a future >> compatible way if this becomes too burdensome.) >> > > I'll pull out the finalization to another PR. > > I spoke with the Bigtable folks and they said that using "" as the start > and end key is unambiguous since they don't allow the "" key to exist and > also that all their requests are based upon a specific key or key ranges > ["", "") is unambiguous. So being at "" and advancing to "" means that you > advanced to the end. > True, not allowing "" to exist is a wart we should not assume, but does allow one to better disambiguate. > The gotchas about having a markAsDone() method is that: > * The SDK harness code powering an SDF needs to be aware that this has > happened > * The SDK harness code may want to know the "position" that was last used > and mark as done obfuscates that > > Neither are insurmountable but we don't have many SDF implementations to > provide guidance as to what is terrible with the API so would rather be > more restrictive now and fix these issues as we go on. > Yep, totally with you on this. > On Tue, Nov 20, 2018 at 1:22 AM Lukasz Cwik <lc...@google.com> wrote: >> >>> I also addressed a bunch of PR comments which clarified the >>> contract/expectations as described in my previous e-mail and the >>> splitting/backlog reporting/bundle finalization docs. >>> >>> On Mon, Nov 19, 2018 at 3:19 PM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> >>>> >>>> On Mon, Nov 19, 2018 at 3:06 PM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Sorry for the late reply. >>>>> >>>>> On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <ieme...@gmail.com> >>>>> wrote: >>>>> >>>>>> Some late comments, and my pre excuses if some questions look silly, >>>>>> but the last documents were a lot of info that I have not yet fully >>>>>> digested. >>>>>> >>>>>> I have some questions about the ‘new’ Backlog concept following a >>>>>> quick look at the PR >>>>>> https://github.com/apache/beam/pull/6969/files >>>>>> >>>>>> 1. Is the Backlog a specific concept for each IO? Or in other words: >>>>>> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am >>>>>> assuming from what I could understand that the Backlog implementation >>>>>> will be data store specific, is this the case? or it can be in some >>>>>> case generalized (for example for Filesystems)? >>>>>> >>>>> >>>>> The backlog is tied heavily to the restriction tracker implementation, >>>>> any data store using the same restriction tracker will provide the same >>>>> backlog computation. For example, if HBase/Bigtable use the >>>>> ByteKeyRestrictionTracker then they will use the same backlog calculation. >>>>> Note that an implementation could subclass a restriction tracker if the >>>>> data store could provide additional information. For example, the default >>>>> backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is >>>>> distance(currentKey, lastKey) where distance is represented as byte array >>>>> subtraction (which can be wildly inaccurrate as the density of data is not >>>>> well reflected) but if HBase/Bigtable could provide the number of bytes >>>>> from current key to last key, a better representation could be provided. >>>>> >>>>> Other common examples of backlogs would be: >>>>> * files: backlog = length of file - current byte offset >>>>> * message queues: backlog = number of outstanding messages >>>>> >>>>> >>>>>> >>>>>> 2. Since the backlog is a byte[] this means that it is up to the user >>>>>> to give it a meaning depending on the situation, is this correct? Also >>>>>> since splitRestriction has now the Backlog as an argument, what do we >>>>>> expect the person that implements this method in a DoFn to do ideally >>>>>> with it? Maybe a more concrete example of how things fit for >>>>>> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for >>>>>> the BundleFinalizer concept too). >>>>>> >>>>> >>>>> Yes, the restriction tracker/restriction/SplittableDoFn must give the >>>>> byte[] a meaning. This can have any meaning but we would like that the >>>>> backlog byte[] representation to be lexicograhically comparable (when >>>>> viewing the byte[] in big endian format and prefixes are smaller (e.g. 001 >>>>> is smaller then 0010) and preferably a linear representation. Note that >>>>> all >>>>> restriction trackers of the same type should use the same "space" so that >>>>> backlogs are comparable across multiple restriction tracker instances. >>>>> >>>>> The backlog when provided to splitRestriction should be used to >>>>> subdivide the restriction into smaller restrictions where each would have >>>>> the backlog if processed (except for potentially the last). >>>>> >>>>> A concrete example would be to represent the remaining bytes to >>>>> process in a file as a 64 bit big endian integer, lets say that is 500MiB >>>>> (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 01000000 >>>>> (note that the trailing zeros are optional and doesn't impact the >>>>> calculation). The runner could notice that processing the restriction will >>>>> take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the >>>>> bits over by 4 and asks to split using backlog 00000000 00000000 00000000 >>>>> 00000000 00000001 11110100. The SDK is able to convert this request back >>>>> into 32768000 bytes and returns 16 restrictions. Another example would be >>>>> for a message queue where we have 10000 messages on the queue remaining so >>>>> the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000 >>>>> 00100111 00010000 when represented as a 64 bit unsigned big endian >>>>> integer. >>>>> The runner could ask the SDK to split using a 1/8th backlog of 00000000 >>>>> 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the >>>>> SDK would break out into 8 restrictions, the first 7 responsible for >>>>> reading 1250 messages and stopping while the last restriction would read >>>>> 1250 messages and then continue to read anything else that has been >>>>> enqueued. >>>>> >>>>> Bundle finalization is unrelated to backlogs but is needed since there >>>>> is a class of data stores which need acknowledgement that says I have >>>>> successfully received your data and am now responsible for it such as >>>>> acking a message from a message queue. >>>>> >>>> >>>> Note that this does bring up the question of whether SDKs should expose >>>> coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist which >>>> would cover a good number of scenarios described above. This coder doesn't >>>> have to be understood by the runner nor does it have to be part of the >>>> portability APIs (either Runner of Fn API). WDYT? >>>> >>>> >>>>> >>>>>> 3. By default all Restrictions are assumed to be unbounded but there >>>>>> is this new Restrictions.IsBounded method, can’t this behavior be >>>>>> inferred (adapted) from the DoFn UnboundedPerElement/Bounded >>>>>> annotation or are these independent concepts? >>>>>> >>>>> >>>>> UnboundedPerElement/BoundedPerElement tells us during pipeline >>>>> construction time what type of PCollection we will be creating since we >>>>> may >>>>> have a bounded PCollection goto an UnboundedPerElement DoFn and that will >>>>> produce an unbounded PCollection and similarly we could have an unbounded >>>>> PCollection goto a BoundedPerElement DoFn and that will produce an >>>>> unbounded PCollection. Restrictions.IsBounded is used during pipeline >>>>> execution to inform the runner whether a restriction being returned is >>>>> bounded or not since unbounded restrictions can return bounded >>>>> restrictions >>>>> during splitting. So in the above example using the message queue, the >>>>> first 7 restrictions that only read 1250 messages would be marked with the >>>>> Restrictions.IsBounded interface while the last one would not be. This >>>>> could also be a method on restrictions such as "IsBounded isBounded()" on >>>>> PCollections. >>>>> >>>>> Extra unrelated comment: >>>>>> Since SDF is still @Experimental we should probably rename >>>>>> OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker >>>>>> suffix (I don’t know why they share the RangeTracker suffix for the >>>>>> new trackers, WDYT? >>>>>> >>>>> >>>>> Agree, will perform in a follow-up PR. >>>>> >>>>> >>>>>> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lc...@google.com> wrote: >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >> >>>>>> >> I think that not returning the users specific subclass should be >>>>>> fine. >>>>>> >> Does the removal of markDone imply that the consumer always knows a >>>>>> >> "final" key to claim on any given restriction? >>>>>> > >>>>>> > >>>>>> > Yes, each restriction needs to support claiming a "final" key that >>>>>> would make the restriction "done". In the BigTable/HBase case it is the >>>>>> empty key "", for files it can be a file offset beyond the end of the >>>>>> file. >>>>>> Generally, restriction trackers written by SDF authors could also take an >>>>>> instance of an object that they can compare instance equality against >>>>>> for a >>>>>> final key. Alternatively we could allow restriction trackers to implement >>>>>> markDone() but would need the SDK have knowledge of the method by having >>>>>> the RestrictionTracker implement interface, extend abstract base class, >>>>>> or >>>>>> reflectively found so that we would be able to wrap it to provide >>>>>> synchronization guarantees. I had toyed with the idea of using something >>>>>> like the ProxyInvocationHandler that backs PipelineOptions to be able to >>>>>> provide a modified version of the users instance that had the appropriate >>>>>> synchronization guarantees but couldn't get it to work. >>>>>> > >>>>>> >> >>>>>> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lc...@google.com> >>>>>> wrote: >>>>>> >> > >>>>>> >> > I have started to work on how to change the user facing API >>>>>> within the Java SDK to support splitting/checkpointing[1], backlog >>>>>> reporting[2] and bundle finalization[3]. >>>>>> >> > >>>>>> >> > I have this PR[4] which contains minimal interface/type >>>>>> definitions to convey how the API surface would change with these 4 >>>>>> changes: >>>>>> >> > 1) Exposes the ability for @SplitRestriction to take a backlog >>>>>> suggestion on how to perform splitting and for how many restrictions >>>>>> should >>>>>> be returned. >>>>>> >> > 2) Adds the ability for RestrictionTrackers to report backlog >>>>>> >> > 3) Updates @ProcessElement to be required to take a generic >>>>>> RestrictionTracker instead of the users own restriction tracker type. >>>>>> >> > 4) Adds the ability for >>>>>> @StartBundle/@ProcessElement/@FinishBundle to register a callback that is >>>>>> invoked after bundle finalization. >>>>>> >> > >>>>>> >> > The details are in the javadoc comments as to how I would expect >>>>>> the contract to play out. >>>>>> >> > Feel free to comment on the ML/PR around the contract and after >>>>>> the feedback is received/digested/implemented, I would like to get the >>>>>> changes submitted so that work can start towards providing an >>>>>> implementation in the Java SDK, Python SDK, and Go SDK and the shared >>>>>> runner portability library. >>>>>> >> > >>>>>> >> > I would like to call out special attention to 3 since with this >>>>>> change it will enable us to remove the synchronization requirement for >>>>>> users as we will wrap the underlying restriction tracker allowing us to >>>>>> add >>>>>> appropriate synchronization as needed and also to watch any calls that >>>>>> pass >>>>>> through the object such as the claim calls. I also believe this prevents >>>>>> people from writing RestrictionTrackers where the contract of tryClaim is >>>>>> subverted since markDone is outside the purview of tryClaim as in >>>>>> ByteKeyRangeTracker[5]. >>>>>> >> > >>>>>> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles >>>>>> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting >>>>>> >> > 3: https://s.apache.org/beam-finalizing-bundles >>>>>> >> > 4: https://github.com/apache/beam/pull/6969 >>>>>> >> > 5: https://github.com/apache/beam/pull/6949 >>>>>> >>>>>