Note I have merged the PR but will continue to iterate based upon the feedback provided in this thread as it has been quite useful.
On Fri, Nov 30, 2018 at 9:37 AM Lukasz Cwik <lc...@google.com> wrote: > Sorry, for some reason I thought I had answered these. > > On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw <rober...@google.com> > wrote: > >> I still have outstanding questions (above) about >> >> 1) Why we need arbitrary precision for backlog, instead of just using >> a (much simpler) double. >> > > Double lacks the precision for reporting backlogs for byte key ranges > (HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and with a > large number of keys with a really long common prefix such as > "aaaaaaaaaaaaaaaaaaaaaaaaaab" and "aaaaaaaaaaaaaaaaaaaaaaaaaac", ... leads > to the backlog not changing even though we are making progress through the > key space. This also prevents splitting within such an area since the > double can't provide that necessary precision (without multiple rounds of > splitting which adds complexity). > > >> 2) Whether its's worth passing backlog back to split requests, rather >> than (again) a double representing "portion of current remaining" >> which may change over time. (The most common split request is into >> even portions, and specifically half, which can't accurately be >> requested from a stale backlog.) >> > > I see two scenarios here: > * the fraction is exposed to the SDF author and then the SDF author needs > to map from their restriciton space to backlog and also map fractions onto > their restriction space meaning that they are required to write mappings > between three different models. > * the fraction is not exposed to the SDF author and the framework code > multiplies the fraction against the backlog and provides the backlog to the > user (this solves the backlog skew issue but still has the limited > precision issue). > > I believe it is easier for an SDF author to write a two way mapping from > backlog to their position space then to write two different types of > mappings. For example, when a person is reading a file that has 100 bytes > to process and is asked to split at 60.3%, they have to map 60.3% onto 100 > bytes figuring out that they are responsible for 60.3 bytes in which they > round down to 60 bytes. In the scenario where the runner provides the > backlog, 60.3 would have been sent across and the SDF author would only > need to perform rounding. > > >> There are also some questions about returning multiple remainders, and >> how that relates to/overlaps with the initial splitting, but those can >> probably be deferred. >> > > Agree. > > >> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik <lc...@google.com> wrote: >> > >> > I updated the PR addressing the last of Scott's comments and also >> migrated to use an integral fraction as Robert had recommended by using >> approach A for the proto representation and BigDecimal within the Java SDK: >> > A: >> > // Represents a non-negative decimal number: unscaled_value * >> 10^(-scale) >> > message Decimal { >> > // Represents the unscaled value as a big endian unlimited precision >> non-negative integer. >> > bytes unscaled_value = 1; >> > // Represents the scale >> > uint32 scale = 2; >> > } >> > >> > Ismael, I would like to defer the changes to improve the ByteBuddy >> DoFnInvoker since that is parallelizable work and have filed BEAM-6142. >> > >> > I don't believe there are any other outstanding changes and would like >> to get the PR merged so that people can start working on implementing >> support for backlog reporting and splitting within the Java SDK harness, >> improving the ByteBuddy DoFnInvoker, exposing the shared runner library >> parts, and integrating this into ULR, Flink, Dataflow, ... >> > >> > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik <lc...@google.com> wrote: >> >> >> >> >> >> >> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía <ieme...@gmail.com> >> wrote: >> >>> >> >>> > Bundle finalization is unrelated to backlogs but is needed since >> there is a class of data stores which need acknowledgement that says I have >> successfully received your data and am now responsible for it such as >> acking a message from a message queue. >> >>> >> >>> Currently ack is done by IOs as part of checkpointing. How this will >> >>> be different? Can you please clarify how should be done in this case, >> >>> or is this totally independent? >> >> >> >> >> >> The flow for finalization and checkpointing is similar: >> >> Checkpointing: >> >> 1) Process a bundle >> >> 2) Checkpoint bundle containing acks that need to be done >> >> 3) When checkpoint resumes, acknowledge messages >> >> >> >> Finalization: >> >> 1) Process a bundle >> >> 2) Request bundle finalization when bundle completes >> >> 3) SDK is asked to finalize bundle >> >> >> >> The difference between the two is that bundle finalization always goes >> back to the same machine instance that processed the bundle while >> checkpointing can be scheduled on another machine. Many message queue like >> systems expose clients which store in memory state and can't ack from >> another machine. You could solve the problem with checkpointing but would >> require each machine to be able to tell another machine that it got a >> checkpoint with acks that it is responsible for but this won't work >> everywhere and isn't as clean. >> >> >> >>> >> >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline >> construction time what type of PCollection we will be creating since we may >> have a bounded PCollection goto an UnboundedPerElement DoFn and that will >> produce an unbounded PCollection and similarly we could have an unbounded >> PCollection goto a BoundedPerElement DoFn and that will produce an >> unbounded PCollection. Restrictions.IsBounded is used during pipeline >> execution to inform the runner whether a restriction being returned is >> bounded or not since unbounded restrictions can return bounded restrictions >> during splitting. So in the above example using the message queue, the >> first 7 restrictions that only read 1250 messages would be marked with the >> Restrictions.IsBounded interface while the last one would not be. This >> could also be a method on restrictions such as "IsBounded isBounded()" on >> Pcollections. >> >>> >> >>> Thanks for the explanation about Restrictions.IsBounded, since this is >> >>> information for the runner What is the runner expected to do >> >>> differently when IsUnbounded? (I assume that IsBounded is the default >> >>> behavior and nothing changes). >> >> >> >> >> >> Knowing whether a restriction is bounded or unbounded is important, >> one example use case would be for the limited depth splitting proposal ( >> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv) >> since you want to keep the unbounded restrictions at level 0 and only pass >> the bounded restrictions to the other levels. The reasoning behind this is >> that you don't want to end up in a state where all your unbounded >> restrictions are at the highest level preventing you from splitting any >> further. >> >> >> >>> >> >>> > Note that this does bring up the question of whether SDKs should >> expose coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist >> which would cover a good number of scenarios described above. This coder >> doesn't have to be understood by the runner nor does it have to be part of >> the portability APIs (either Runner of Fn API). WDYT? >> >>> >> >>> Yes we may need a Coder effectively for both sides, only thing I don’t >> >>> like is external impact in the API. I mean it is not too complex, but >> >>> adds some extras to support things that are ‘rarely’ changed. >> >> >> >> >> >> Based upon Robert's suggestion above to swap to use a integral >> floating point number and even without Robert's suggestion this won't work. >> The idea was that a coder would help convert the byte[] backlog >> representation to/from a type the user wants but the issue is that the >> Runner may give any arbitrary byte[] backlog to the SDK during splitting >> and this coder would need to be able to handle it. >> >> >> >>> >> >>> > Ismael, I looked at the API around ByteKeyRangeTracker and >> OffsetRangeTracker figured out that the classes are named as such because >> they are trackers for the OffsetRange and ByteKeyRange classes. Some >> options are to: >> >>> > 1) Copy the ByteKeyRange and call it ByteKeyRestriction and >> similarly copy OffsetRange and call it OffsetRestriction. This would allow >> us to name the trackers ByteKeyRestrictionTracker and >> OffsetRestrictionTracker. Note that we can't rename because that would be a >> backwards incompatible change for existing users of >> ByteKeyRange/OffsetRange. This would allow us to add methods relevant to >> SDF and remove methods that aren't needed. >> >>> > 2) Rename ByteKeyRangeTracker to ByteKeyRangeRestrictionTracker and >> OffsetRangeTracker to OffsetRangeRestrictionTracker. Not really liking this >> option. >> >>> > 3) Leave things as they are. >> >>> >> >>> For the RangeTracker vs RestrictionTracker discussion I will probably >> >>> lean to (3) Leave things as they are) save if there is important >> >>> things to change/fix (1) which I am not aware of. >> >> >> >> >> >> Sounds good to me. >> >> >> >>> >> >>> On Tue, Nov 20, 2018 at 12:07 AM Lukasz Cwik <lc...@google.com> >> wrote: >> >>> > >> >>> > Sorry for the late reply. >> >>> > >> >>> > On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <ieme...@gmail.com> >> wrote: >> >>> >> >> >>> >> Some late comments, and my pre excuses if some questions look >> silly, >> >>> >> but the last documents were a lot of info that I have not yet fully >> >>> >> digested. >> >>> >> >> >>> >> I have some questions about the ‘new’ Backlog concept following a >> >>> >> quick look at the PR >> >>> >> https://github.com/apache/beam/pull/6969/files >> >>> >> >> >>> >> 1. Is the Backlog a specific concept for each IO? Or in other >> words: >> >>> >> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I >> am >> >>> >> assuming from what I could understand that the Backlog >> implementation >> >>> >> will be data store specific, is this the case? or it can be in some >> >>> >> case generalized (for example for Filesystems)? >> >>> > >> >>> > >> >>> > The backlog is tied heavily to the restriction tracker >> implementation, any data store using the same restriction tracker will >> provide the same backlog computation. For example, if HBase/Bigtable use >> the ByteKeyRestrictionTracker then they will use the same backlog >> calculation. Note that an implementation could subclass a restriction >> tracker if the data store could provide additional information. For >> example, the default backlog for a ByteKeyRestrictionTracker over >> [startKey, endKey) is distance(currentKey, lastKey) where distance is >> represented as byte array subtraction (which can be wildly inaccurrate as >> the density of data is not well reflected) but if HBase/Bigtable could >> provide the number of bytes from current key to last key, a better >> representation could be provided. >> >>> > >> >>> > Other common examples of backlogs would be: >> >>> > * files: backlog = length of file - current byte offset >> >>> > * message queues: backlog = number of outstanding messages >> >>> > >> >>> >> >> >>> >> >> >>> >> 2. Since the backlog is a byte[] this means that it is up to the >> user >> >>> >> to give it a meaning depending on the situation, is this correct? >> Also >> >>> >> since splitRestriction has now the Backlog as an argument, what do >> we >> >>> >> expect the person that implements this method in a DoFn to do >> ideally >> >>> >> with it? Maybe a more concrete example of how things fit for >> >>> >> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also >> for >> >>> >> the BundleFinalizer concept too). >> >>> > >> >>> > >> >>> > Yes, the restriction tracker/restriction/SplittableDoFn must give >> the byte[] a meaning. This can have any meaning but we would like that the >> backlog byte[] representation to be lexicograhically comparable (when >> viewing the byte[] in big endian format and prefixes are smaller (e.g. 001 >> is smaller then 0010) and preferably a linear representation. Note that all >> restriction trackers of the same type should use the same "space" so that >> backlogs are comparable across multiple restriction tracker instances. >> >>> > >> >>> > The backlog when provided to splitRestriction should be used to >> subdivide the restriction into smaller restrictions where each would have >> the backlog if processed (except for potentially the last). >> >>> > >> >>> > A concrete example would be to represent the remaining bytes to >> process in a file as a 64 bit big endian integer, lets say that is 500MiB >> (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 01000000 >> (note that the trailing zeros are optional and doesn't impact the >> calculation). The runner could notice that processing the restriction will >> take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the >> bits over by 4 and asks to split using backlog 00000000 00000000 00000000 >> 00000000 00000001 11110100. The SDK is able to convert this request back >> into 32768000 bytes and returns 16 restrictions. Another example would be >> for a message queue where we have 10000 messages on the queue remaining so >> the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000 >> 00100111 00010000 when represented as a 64 bit unsigned big endian integer. >> The runner could ask the SDK to split using a 1/8th backlog of 00000000 >> 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the >> SDK would break out into 8 restrictions, the first 7 responsible for >> reading 1250 messages and stopping while the last restriction would read >> 1250 messages and then continue to read anything else that has been >> enqueued. >> >>> > >> >>> > Bundle finalization is unrelated to backlogs but is needed since >> there is a class of data stores which need acknowledgement that says I have >> successfully received your data and am now responsible for it such as >> acking a message from a message queue. >> >>> > >> >>> >> >> >>> >> >> >>> >> 3. By default all Restrictions are assumed to be unbounded but >> there >> >>> >> is this new Restrictions.IsBounded method, can’t this behavior be >> >>> >> inferred (adapted) from the DoFn UnboundedPerElement/Bounded >> >>> >> annotation or are these independent concepts? >> >>> > >> >>> > >> >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline >> construction time what type of PCollection we will be creating since we may >> have a bounded PCollection goto an UnboundedPerElement DoFn and that will >> produce an unbounded PCollection and similarly we could have an unbounded >> PCollection goto a BoundedPerElement DoFn and that will produce an >> unbounded PCollection. Restrictions.IsBounded is used during pipeline >> execution to inform the runner whether a restriction being returned is >> bounded or not since unbounded restrictions can return bounded restrictions >> during splitting. So in the above example using the message queue, the >> first 7 restrictions that only read 1250 messages would be marked with the >> Restrictions.IsBounded interface while the last one would not be. This >> could also be a method on restrictions such as "IsBounded isBounded()" on >> PCollections. >> >>> > >> >>> >> Extra unrelated comment: >> >>> >> Since SDF is still @Experimental we should probably rename >> >>> >> OffsetRangeTracker and ByteKeyRangeTracker into the >> RestrictionTracker >> >>> >> suffix (I don’t know why they share the RangeTracker suffix for the >> >>> >> new trackers, WDYT? >> >>> > >> >>> > >> >>> > Agree, will perform in a follow-up PR. >> >>> > >> >>> >> >> >>> >> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lc...@google.com> >> wrote: >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw < >> rober...@google.com> wrote: >> >>> >> >> >> >>> >> >> I think that not returning the users specific subclass should >> be fine. >> >>> >> >> Does the removal of markDone imply that the consumer always >> knows a >> >>> >> >> "final" key to claim on any given restriction? >> >>> >> > >> >>> >> > >> >>> >> > Yes, each restriction needs to support claiming a "final" key >> that would make the restriction "done". In the BigTable/HBase case it is >> the empty key "", for files it can be a file offset beyond the end of the >> file. Generally, restriction trackers written by SDF authors could also >> take an instance of an object that they can compare instance equality >> against for a final key. Alternatively we could allow restriction trackers >> to implement markDone() but would need the SDK have knowledge of the method >> by having the RestrictionTracker implement interface, extend abstract base >> class, or reflectively found so that we would be able to wrap it to provide >> synchronization guarantees. I had toyed with the idea of using something >> like the ProxyInvocationHandler that backs PipelineOptions to be able to >> provide a modified version of the users instance that had the appropriate >> synchronization guarantees but couldn't get it to work. >> >>> >> > >> >>> >> >> >> >>> >> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lc...@google.com> >> wrote: >> >>> >> >> > >> >>> >> >> > I have started to work on how to change the user facing API >> within the Java SDK to support splitting/checkpointing[1], backlog >> reporting[2] and bundle finalization[3]. >> >>> >> >> > >> >>> >> >> > I have this PR[4] which contains minimal interface/type >> definitions to convey how the API surface would change with these 4 changes: >> >>> >> >> > 1) Exposes the ability for @SplitRestriction to take a >> backlog suggestion on how to perform splitting and for how many >> restrictions should be returned. >> >>> >> >> > 2) Adds the ability for RestrictionTrackers to report backlog >> >>> >> >> > 3) Updates @ProcessElement to be required to take a generic >> RestrictionTracker instead of the users own restriction tracker type. >> >>> >> >> > 4) Adds the ability for >> @StartBundle/@ProcessElement/@FinishBundle to register a callback that is >> invoked after bundle finalization. >> >>> >> >> > >> >>> >> >> > The details are in the javadoc comments as to how I would >> expect the contract to play out. >> >>> >> >> > Feel free to comment on the ML/PR around the contract and >> after the feedback is received/digested/implemented, I would like to get >> the changes submitted so that work can start towards providing an >> implementation in the Java SDK, Python SDK, and Go SDK and the shared >> runner portability library. >> >>> >> >> > >> >>> >> >> > I would like to call out special attention to 3 since with >> this change it will enable us to remove the synchronization requirement for >> users as we will wrap the underlying restriction tracker allowing us to add >> appropriate synchronization as needed and also to watch any calls that pass >> through the object such as the claim calls. I also believe this prevents >> people from writing RestrictionTrackers where the contract of tryClaim is >> subverted since markDone is outside the purview of tryClaim as in >> ByteKeyRangeTracker[5]. >> >>> >> >> > >> >>> >> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles >> >>> >> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting >> >>> >> >> > 3: https://s.apache.org/beam-finalizing-bundles >> >>> >> >> > 4: https://github.com/apache/beam/pull/6969 >> >>> >> >> > 5: https://github.com/apache/beam/pull/6949 >> >