I still have outstanding questions (above) about 1) Why we need arbitrary precision for backlog, instead of just using a (much simpler) double. 2) Whether its's worth passing backlog back to split requests, rather than (again) a double representing "portion of current remaining" which may change over time. (The most common split request is into even portions, and specifically half, which can't accurately be requested from a stale backlog.)
There are also some questions about returning multiple remainders, and how that relates to/overlaps with the initial splitting, but those can probably be deferred. On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik <lc...@google.com> wrote: > > I updated the PR addressing the last of Scott's comments and also migrated to > use an integral fraction as Robert had recommended by using approach A for > the proto representation and BigDecimal within the Java SDK: > A: > // Represents a non-negative decimal number: unscaled_value * 10^(-scale) > message Decimal { > // Represents the unscaled value as a big endian unlimited precision > non-negative integer. > bytes unscaled_value = 1; > // Represents the scale > uint32 scale = 2; > } > > Ismael, I would like to defer the changes to improve the ByteBuddy > DoFnInvoker since that is parallelizable work and have filed BEAM-6142. > > I don't believe there are any other outstanding changes and would like to get > the PR merged so that people can start working on implementing support for > backlog reporting and splitting within the Java SDK harness, improving the > ByteBuddy DoFnInvoker, exposing the shared runner library parts, and > integrating this into ULR, Flink, Dataflow, ... > > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik <lc...@google.com> wrote: >> >> >> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía <ieme...@gmail.com> wrote: >>> >>> > Bundle finalization is unrelated to backlogs but is needed since there is >>> > a class of data stores which need acknowledgement that says I have >>> > successfully received your data and am now responsible for it such as >>> > acking a message from a message queue. >>> >>> Currently ack is done by IOs as part of checkpointing. How this will >>> be different? Can you please clarify how should be done in this case, >>> or is this totally independent? >> >> >> The flow for finalization and checkpointing is similar: >> Checkpointing: >> 1) Process a bundle >> 2) Checkpoint bundle containing acks that need to be done >> 3) When checkpoint resumes, acknowledge messages >> >> Finalization: >> 1) Process a bundle >> 2) Request bundle finalization when bundle completes >> 3) SDK is asked to finalize bundle >> >> The difference between the two is that bundle finalization always goes back >> to the same machine instance that processed the bundle while checkpointing >> can be scheduled on another machine. Many message queue like systems expose >> clients which store in memory state and can't ack from another machine. You >> could solve the problem with checkpointing but would require each machine to >> be able to tell another machine that it got a checkpoint with acks that it >> is responsible for but this won't work everywhere and isn't as clean. >> >>> >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline >>> > construction time what type of PCollection we will be creating since we >>> > may have a bounded PCollection goto an UnboundedPerElement DoFn and that >>> > will produce an unbounded PCollection and similarly we could have an >>> > unbounded PCollection goto a BoundedPerElement DoFn and that will produce >>> > an unbounded PCollection. Restrictions.IsBounded is used during pipeline >>> > execution to inform the runner whether a restriction being returned is >>> > bounded or not since unbounded restrictions can return bounded >>> > restrictions during splitting. So in the above example using the message >>> > queue, the first 7 restrictions that only read 1250 messages would be >>> > marked with the Restrictions.IsBounded interface while the last one would >>> > not be. This could also be a method on restrictions such as "IsBounded >>> > isBounded()" on Pcollections. >>> >>> Thanks for the explanation about Restrictions.IsBounded, since this is >>> information for the runner What is the runner expected to do >>> differently when IsUnbounded? (I assume that IsBounded is the default >>> behavior and nothing changes). >> >> >> Knowing whether a restriction is bounded or unbounded is important, one >> example use case would be for the limited depth splitting proposal >> (https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv) >> since you want to keep the unbounded restrictions at level 0 and only pass >> the bounded restrictions to the other levels. The reasoning behind this is >> that you don't want to end up in a state where all your unbounded >> restrictions are at the highest level preventing you from splitting any >> further. >> >>> >>> > Note that this does bring up the question of whether SDKs should expose >>> > coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist which >>> > would cover a good number of scenarios described above. This coder >>> > doesn't have to be understood by the runner nor does it have to be part >>> > of the portability APIs (either Runner of Fn API). WDYT? >>> >>> Yes we may need a Coder effectively for both sides, only thing I don’t >>> like is external impact in the API. I mean it is not too complex, but >>> adds some extras to support things that are ‘rarely’ changed. >> >> >> Based upon Robert's suggestion above to swap to use a integral floating >> point number and even without Robert's suggestion this won't work. The idea >> was that a coder would help convert the byte[] backlog representation >> to/from a type the user wants but the issue is that the Runner may give any >> arbitrary byte[] backlog to the SDK during splitting and this coder would >> need to be able to handle it. >> >>> >>> > Ismael, I looked at the API around ByteKeyRangeTracker and >>> > OffsetRangeTracker figured out that the classes are named as such because >>> > they are trackers for the OffsetRange and ByteKeyRange classes. Some >>> > options are to: >>> > 1) Copy the ByteKeyRange and call it ByteKeyRestriction and similarly >>> > copy OffsetRange and call it OffsetRestriction. This would allow us to >>> > name the trackers ByteKeyRestrictionTracker and OffsetRestrictionTracker. >>> > Note that we can't rename because that would be a backwards incompatible >>> > change for existing users of ByteKeyRange/OffsetRange. This would allow >>> > us to add methods relevant to SDF and remove methods that aren't needed. >>> > 2) Rename ByteKeyRangeTracker to ByteKeyRangeRestrictionTracker and >>> > OffsetRangeTracker to OffsetRangeRestrictionTracker. Not really liking >>> > this option. >>> > 3) Leave things as they are. >>> >>> For the RangeTracker vs RestrictionTracker discussion I will probably >>> lean to (3) Leave things as they are) save if there is important >>> things to change/fix (1) which I am not aware of. >> >> >> Sounds good to me. >> >>> >>> On Tue, Nov 20, 2018 at 12:07 AM Lukasz Cwik <lc...@google.com> wrote: >>> > >>> > Sorry for the late reply. >>> > >>> > On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <ieme...@gmail.com> wrote: >>> >> >>> >> Some late comments, and my pre excuses if some questions look silly, >>> >> but the last documents were a lot of info that I have not yet fully >>> >> digested. >>> >> >>> >> I have some questions about the ‘new’ Backlog concept following a >>> >> quick look at the PR >>> >> https://github.com/apache/beam/pull/6969/files >>> >> >>> >> 1. Is the Backlog a specific concept for each IO? Or in other words: >>> >> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am >>> >> assuming from what I could understand that the Backlog implementation >>> >> will be data store specific, is this the case? or it can be in some >>> >> case generalized (for example for Filesystems)? >>> > >>> > >>> > The backlog is tied heavily to the restriction tracker implementation, >>> > any data store using the same restriction tracker will provide the same >>> > backlog computation. For example, if HBase/Bigtable use the >>> > ByteKeyRestrictionTracker then they will use the same backlog >>> > calculation. Note that an implementation could subclass a restriction >>> > tracker if the data store could provide additional information. For >>> > example, the default backlog for a ByteKeyRestrictionTracker over >>> > [startKey, endKey) is distance(currentKey, lastKey) where distance is >>> > represented as byte array subtraction (which can be wildly inaccurrate as >>> > the density of data is not well reflected) but if HBase/Bigtable could >>> > provide the number of bytes from current key to last key, a better >>> > representation could be provided. >>> > >>> > Other common examples of backlogs would be: >>> > * files: backlog = length of file - current byte offset >>> > * message queues: backlog = number of outstanding messages >>> > >>> >> >>> >> >>> >> 2. Since the backlog is a byte[] this means that it is up to the user >>> >> to give it a meaning depending on the situation, is this correct? Also >>> >> since splitRestriction has now the Backlog as an argument, what do we >>> >> expect the person that implements this method in a DoFn to do ideally >>> >> with it? Maybe a more concrete example of how things fit for >>> >> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for >>> >> the BundleFinalizer concept too). >>> > >>> > >>> > Yes, the restriction tracker/restriction/SplittableDoFn must give the >>> > byte[] a meaning. This can have any meaning but we would like that the >>> > backlog byte[] representation to be lexicograhically comparable (when >>> > viewing the byte[] in big endian format and prefixes are smaller (e.g. >>> > 001 is smaller then 0010) and preferably a linear representation. Note >>> > that all restriction trackers of the same type should use the same >>> > "space" so that backlogs are comparable across multiple restriction >>> > tracker instances. >>> > >>> > The backlog when provided to splitRestriction should be used to subdivide >>> > the restriction into smaller restrictions where each would have the >>> > backlog if processed (except for potentially the last). >>> > >>> > A concrete example would be to represent the remaining bytes to process >>> > in a file as a 64 bit big endian integer, lets say that is 500MiB >>> > (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 >>> > 01000000 (note that the trailing zeros are optional and doesn't impact >>> > the calculation). The runner could notice that processing the restriction >>> > will take 10 hrs, so it asks the SDF to split at 1/16 segments by >>> > shifting the bits over by 4 and asks to split using backlog 00000000 >>> > 00000000 00000000 00000000 00000001 11110100. The SDK is able to convert >>> > this request back into 32768000 bytes and returns 16 restrictions. >>> > Another example would be for a message queue where we have 10000 messages >>> > on the queue remaining so the backlog would be 00000000 00000000 00000000 >>> > 00000000 00000000 00000000 00100111 00010000 when represented as a 64 bit >>> > unsigned big endian integer. The runner could ask the SDK to split using >>> > a 1/8th backlog of 00000000 00000000 00000000 00000000 00000000 00000000 >>> > 00000100 11100010 which the SDK would break out into 8 restrictions, the >>> > first 7 responsible for reading 1250 messages and stopping while the last >>> > restriction would read 1250 messages and then continue to read anything >>> > else that has been enqueued. >>> > >>> > Bundle finalization is unrelated to backlogs but is needed since there is >>> > a class of data stores which need acknowledgement that says I have >>> > successfully received your data and am now responsible for it such as >>> > acking a message from a message queue. >>> > >>> >> >>> >> >>> >> 3. By default all Restrictions are assumed to be unbounded but there >>> >> is this new Restrictions.IsBounded method, can’t this behavior be >>> >> inferred (adapted) from the DoFn UnboundedPerElement/Bounded >>> >> annotation or are these independent concepts? >>> > >>> > >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline >>> > construction time what type of PCollection we will be creating since we >>> > may have a bounded PCollection goto an UnboundedPerElement DoFn and that >>> > will produce an unbounded PCollection and similarly we could have an >>> > unbounded PCollection goto a BoundedPerElement DoFn and that will produce >>> > an unbounded PCollection. Restrictions.IsBounded is used during pipeline >>> > execution to inform the runner whether a restriction being returned is >>> > bounded or not since unbounded restrictions can return bounded >>> > restrictions during splitting. So in the above example using the message >>> > queue, the first 7 restrictions that only read 1250 messages would be >>> > marked with the Restrictions.IsBounded interface while the last one would >>> > not be. This could also be a method on restrictions such as "IsBounded >>> > isBounded()" on PCollections. >>> > >>> >> Extra unrelated comment: >>> >> Since SDF is still @Experimental we should probably rename >>> >> OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker >>> >> suffix (I don’t know why they share the RangeTracker suffix for the >>> >> new trackers, WDYT? >>> > >>> > >>> > Agree, will perform in a follow-up PR. >>> > >>> >> >>> >> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lc...@google.com> wrote: >>> >> > >>> >> > >>> >> > >>> >> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <rober...@google.com> >>> >> > wrote: >>> >> >> >>> >> >> I think that not returning the users specific subclass should be fine. >>> >> >> Does the removal of markDone imply that the consumer always knows a >>> >> >> "final" key to claim on any given restriction? >>> >> > >>> >> > >>> >> > Yes, each restriction needs to support claiming a "final" key that >>> >> > would make the restriction "done". In the BigTable/HBase case it is >>> >> > the empty key "", for files it can be a file offset beyond the end of >>> >> > the file. Generally, restriction trackers written by SDF authors could >>> >> > also take an instance of an object that they can compare instance >>> >> > equality against for a final key. Alternatively we could allow >>> >> > restriction trackers to implement markDone() but would need the SDK >>> >> > have knowledge of the method by having the RestrictionTracker >>> >> > implement interface, extend abstract base class, or reflectively found >>> >> > so that we would be able to wrap it to provide synchronization >>> >> > guarantees. I had toyed with the idea of using something like the >>> >> > ProxyInvocationHandler that backs PipelineOptions to be able to >>> >> > provide a modified version of the users instance that had the >>> >> > appropriate synchronization guarantees but couldn't get it to work. >>> >> > >>> >> >> >>> >> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lc...@google.com> wrote: >>> >> >> > >>> >> >> > I have started to work on how to change the user facing API within >>> >> >> > the Java SDK to support splitting/checkpointing[1], backlog >>> >> >> > reporting[2] and bundle finalization[3]. >>> >> >> > >>> >> >> > I have this PR[4] which contains minimal interface/type definitions >>> >> >> > to convey how the API surface would change with these 4 changes: >>> >> >> > 1) Exposes the ability for @SplitRestriction to take a backlog >>> >> >> > suggestion on how to perform splitting and for how many >>> >> >> > restrictions should be returned. >>> >> >> > 2) Adds the ability for RestrictionTrackers to report backlog >>> >> >> > 3) Updates @ProcessElement to be required to take a generic >>> >> >> > RestrictionTracker instead of the users own restriction tracker >>> >> >> > type. >>> >> >> > 4) Adds the ability for @StartBundle/@ProcessElement/@FinishBundle >>> >> >> > to register a callback that is invoked after bundle finalization. >>> >> >> > >>> >> >> > The details are in the javadoc comments as to how I would expect >>> >> >> > the contract to play out. >>> >> >> > Feel free to comment on the ML/PR around the contract and after the >>> >> >> > feedback is received/digested/implemented, I would like to get the >>> >> >> > changes submitted so that work can start towards providing an >>> >> >> > implementation in the Java SDK, Python SDK, and Go SDK and the >>> >> >> > shared runner portability library. >>> >> >> > >>> >> >> > I would like to call out special attention to 3 since with this >>> >> >> > change it will enable us to remove the synchronization requirement >>> >> >> > for users as we will wrap the underlying restriction tracker >>> >> >> > allowing us to add appropriate synchronization as needed and also >>> >> >> > to watch any calls that pass through the object such as the claim >>> >> >> > calls. I also believe this prevents people from writing >>> >> >> > RestrictionTrackers where the contract of tryClaim is subverted >>> >> >> > since markDone is outside the purview of tryClaim as in >>> >> >> > ByteKeyRangeTracker[5]. >>> >> >> > >>> >> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles >>> >> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting >>> >> >> > 3: https://s.apache.org/beam-finalizing-bundles >>> >> >> > 4: https://github.com/apache/beam/pull/6969 >>> >> >> > 5: https://github.com/apache/beam/pull/6949