Based upon the current Java SDK API, I was able to implement Runner initiated checkpointing that the Java SDK honors within PR https://github.com/apache/beam/pull/7200.
This is an exciting first step to a splitting implementation, feel free to take a look and comment. I have added two basic tests, execute SDF without splitting and execute SDF with a runner initiated checkpoint. On Fri, Nov 30, 2018 at 4:52 PM Robert Bradshaw <[email protected]> wrote: > On Fri, Nov 30, 2018 at 10:14 PM Lukasz Cwik <[email protected]> wrote: > > > > On Fri, Nov 30, 2018 at 1:02 PM Robert Bradshaw <[email protected]> > wrote: > >> > >> On Fri, Nov 30, 2018 at 6:38 PM Lukasz Cwik <[email protected]> wrote: > >> > > >> > Sorry, for some reason I thought I had answered these. > >> > >> No problem, thanks for you patience :). > >> > >> > On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw <[email protected]> > wrote: > >> >> > >> >> I still have outstanding questions (above) about > >> >> > >> >> 1) Why we need arbitrary precision for backlog, instead of just using > >> >> a (much simpler) double. > >> > > >> > > >> > Double lacks the precision for reporting backlogs for byte key ranges > (HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and with a > large number of keys with a really long common prefix such as > "aaaaaaaaaaaaaaaaaaaaaaaaaab" and "aaaaaaaaaaaaaaaaaaaaaaaaaac", ... leads > to the backlog not changing even though we are making progress through the > key space. This also prevents splitting within such an area since the > double can't provide that necessary precision (without multiple rounds of > splitting which adds complexity). > >> > >> We'll have to support multiple rounds of splitting regardless. I can > >> see how this gives more information up front though. > > > > I agree that we will need to support multiple rounds of splitting from > the SDK side but this adds complexity from the runner side since it can > only increase the accuracy for a split by performing multiple rounds of > splitting at once. > > > >> (As an aside, I've been thinking about some ways of solving the dark > >> matter problem, and it might depend on knowing the actual key, using > >> the fact that character boundaries are likely cut-off points for > >> changes in density, which would get obscured by alternative > >> representations.) > > > > Every time I think about this issue, I can never get it to apply > meaningfully for unbounded sources such as a message queue like pubsub. > > Yeah, neither can I. > > > Also, having an infinitely precise backlog such as the decimal format > would still provide density information as the rate of change through the > backlog for a bounded source would change once a "cluster" was hit. > > This is getting to somewhat of a tangential topic, but the key insight > is that although it's easy to find the start of a cluster, to split > ideally one would want to know where the end of the cluster is. For > keyspaces, this is likely to be at binary fractions, and in particular > looking at the longevity of common prefixes of length n one could make > heuristic guesses as to where this density dropoff may be. (This also > requires splitting at a key, not splitting relative to a current > position, which has its issues...) > > >> >> 2) Whether its's worth passing backlog back to split requests, rather > >> >> than (again) a double representing "portion of current remaining" > >> >> which may change over time. (The most common split request is into > >> >> even portions, and specifically half, which can't accurately be > >> >> requested from a stale backlog.) > >> > > >> > I see two scenarios here: > >> > * the fraction is exposed to the SDF author and then the SDF author > needs to map from their restriciton space to backlog and also map fractions > onto their restriction space meaning that they are required to write > mappings between three different models. > >> > * the fraction is not exposed to the SDF author and the framework > code multiplies the fraction against the backlog and provides the backlog > to the user (this solves the backlog skew issue but still has the limited > precision issue). > >> > >> Limited precision is not as much of an issue here because one can > >> express very small numbers to split close to the current position, and > >> don't need high precision for splitting further away. > > > > Agree. Would this also mean that skew when splitting at half doesn't > really matter? > > Lots of times keyspaces have big pockets of low density. If one hits > one of these ranges between when the backlog is reported and when the > split is requested, the skew can get quite large. Basically using a > fraction means that a system does not have to be as concerned about > stale data, and can make reasonable choices without data at all (e.g. > imagine upscaling from 200 to 300 workers and asking for everyone to > just give 33% of their work back), and when it does make choices based > on actual backlog the discrepancy between what was ideal at the time > backlog was requested and what's ideal now is shared between the > primary and remainder(s) rather than one side or the other absorbing > this entire error > > This of course gets exacerbated with multiple splits, e.g. if the > measured backlog was 100 and you wanted to split the work in 10 > pieces, asking for a split at 10 would only result in 9 splits if the > cursor advanced by 10 in the meantime, and if it advanced by 9 you'd > probably want to use fractions anyway to spread the error out and > produce (10, 9, 9, 9, 9, 9, 9, 9, 9, 9) rather than (10, 10, 10, 10, > 10, 10, 10, 10, 10, 1). > > >> I also think it's nice that the space of possible splits is always > >> (current position, restriction end) which a always double maps onto > >> despite those both being moving targets. If you phrase things in terms > >> of backlogs, you might ask for impossible things. I don't recall if > >> the passed backlog is the amount that should be retained or the amount > >> that should be returned, but if the latter, it'll be difficult to > >> accurately split near the current position. > > > > > > For the current proposal, it represents how much should be retained but > as was mentioned earlier, the semantics of returning multiple splits is > still up in the air. > > > >> > >> > I believe it is easier for an SDF author to write a two way mapping > from backlog to their position space then to write two different types of > mappings. For example, when a person is reading a file that has 100 bytes > to process and is asked to split at 60.3%, they have to map 60.3% onto 100 > bytes figuring out that they are responsible for 60.3 bytes in which they > round down to 60 bytes. In the scenario where the runner provides the > backlog, 60.3 would have been sent across and the SDF author would only > need to perform rounding. > >> > >> Yeah, that's something to mull on. Maybe with a set of concrete > examples. > >> > >> >> There are also some questions about returning multiple remainders, > and > >> >> how that relates to/overlaps with the initial splitting, but those > can > >> >> probably be deferred. > >> > > >> > > >> > Agree. > >> > > >> >> > >> >> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik <[email protected]> > wrote: > >> >> > > >> >> > I updated the PR addressing the last of Scott's comments and also > migrated to use an integral fraction as Robert had recommended by using > approach A for the proto representation and BigDecimal within the Java SDK: > >> >> > A: > >> >> > // Represents a non-negative decimal number: unscaled_value * > 10^(-scale) > >> >> > message Decimal { > >> >> > // Represents the unscaled value as a big endian unlimited > precision non-negative integer. > >> >> > bytes unscaled_value = 1; > >> >> > // Represents the scale > >> >> > uint32 scale = 2; > >> >> > } > >> >> > > >> >> > Ismael, I would like to defer the changes to improve the ByteBuddy > DoFnInvoker since that is parallelizable work and have filed BEAM-6142. > >> >> > > >> >> > I don't believe there are any other outstanding changes and would > like to get the PR merged so that people can start working on implementing > support for backlog reporting and splitting within the Java SDK harness, > improving the ByteBuddy DoFnInvoker, exposing the shared runner library > parts, and integrating this into ULR, Flink, Dataflow, ... > >> >> > > >> >> > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik <[email protected]> > wrote: > >> >> >> > >> >> >> > >> >> >> > >> >> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía <[email protected]> > wrote: > >> >> >>> > >> >> >>> > Bundle finalization is unrelated to backlogs but is needed > since there is a class of data stores which need acknowledgement that says > I have successfully received your data and am now responsible for it such > as acking a message from a message queue. > >> >> >>> > >> >> >>> Currently ack is done by IOs as part of checkpointing. How this > will > >> >> >>> be different? Can you please clarify how should be done in this > case, > >> >> >>> or is this totally independent? > >> >> >> > >> >> >> > >> >> >> The flow for finalization and checkpointing is similar: > >> >> >> Checkpointing: > >> >> >> 1) Process a bundle > >> >> >> 2) Checkpoint bundle containing acks that need to be done > >> >> >> 3) When checkpoint resumes, acknowledge messages > >> >> >> > >> >> >> Finalization: > >> >> >> 1) Process a bundle > >> >> >> 2) Request bundle finalization when bundle completes > >> >> >> 3) SDK is asked to finalize bundle > >> >> >> > >> >> >> The difference between the two is that bundle finalization always > goes back to the same machine instance that processed the bundle while > checkpointing can be scheduled on another machine. Many message queue like > systems expose clients which store in memory state and can't ack from > another machine. You could solve the problem with checkpointing but would > require each machine to be able to tell another machine that it got a > checkpoint with acks that it is responsible for but this won't work > everywhere and isn't as clean. > >> >> >> > >> >> >>> > >> >> >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline > construction time what type of PCollection we will be creating since we may > have a bounded PCollection goto an UnboundedPerElement DoFn and that will > produce an unbounded PCollection and similarly we could have an unbounded > PCollection goto a BoundedPerElement DoFn and that will produce an > unbounded PCollection. Restrictions.IsBounded is used during pipeline > execution to inform the runner whether a restriction being returned is > bounded or not since unbounded restrictions can return bounded restrictions > during splitting. So in the above example using the message queue, the > first 7 restrictions that only read 1250 messages would be marked with the > Restrictions.IsBounded interface while the last one would not be. This > could also be a method on restrictions such as "IsBounded isBounded()" on > Pcollections. > >> >> >>> > >> >> >>> Thanks for the explanation about Restrictions.IsBounded, since > this is > >> >> >>> information for the runner What is the runner expected to do > >> >> >>> differently when IsUnbounded? (I assume that IsBounded is the > default > >> >> >>> behavior and nothing changes). > >> >> >> > >> >> >> > >> >> >> Knowing whether a restriction is bounded or unbounded is > important, one example use case would be for the limited depth splitting > proposal ( > https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv) > since you want to keep the unbounded restrictions at level 0 and only pass > the bounded restrictions to the other levels. The reasoning behind this is > that you don't want to end up in a state where all your unbounded > restrictions are at the highest level preventing you from splitting any > further. > >> >> >> > >> >> >>> > >> >> >>> > Note that this does bring up the question of whether SDKs > should expose coders for backlogs since ByteKeyCoder and BigEndianLongCoder > exist which would cover a good number of scenarios described above. This > coder doesn't have to be understood by the runner nor does it have to be > part of the portability APIs (either Runner of Fn API). WDYT? > >> >> >>> > >> >> >>> Yes we may need a Coder effectively for both sides, only thing I > don’t > >> >> >>> like is external impact in the API. I mean it is not too > complex, but > >> >> >>> adds some extras to support things that are ‘rarely’ changed. > >> >> >> > >> >> >> > >> >> >> Based upon Robert's suggestion above to swap to use a integral > floating point number and even without Robert's suggestion this won't work. > The idea was that a coder would help convert the byte[] backlog > representation to/from a type the user wants but the issue is that the > Runner may give any arbitrary byte[] backlog to the SDK during splitting > and this coder would need to be able to handle it. > >> >> >> > >> >> >>> > >> >> >>> > Ismael, I looked at the API around ByteKeyRangeTracker and > OffsetRangeTracker figured out that the classes are named as such because > they are trackers for the OffsetRange and ByteKeyRange classes. Some > options are to: > >> >> >>> > 1) Copy the ByteKeyRange and call it ByteKeyRestriction and > similarly copy OffsetRange and call it OffsetRestriction. This would allow > us to name the trackers ByteKeyRestrictionTracker and > OffsetRestrictionTracker. Note that we can't rename because that would be a > backwards incompatible change for existing users of > ByteKeyRange/OffsetRange. This would allow us to add methods relevant to > SDF and remove methods that aren't needed. > >> >> >>> > 2) Rename ByteKeyRangeTracker to > ByteKeyRangeRestrictionTracker and OffsetRangeTracker to > OffsetRangeRestrictionTracker. Not really liking this option. > >> >> >>> > 3) Leave things as they are. > >> >> >>> > >> >> >>> For the RangeTracker vs RestrictionTracker discussion I will > probably > >> >> >>> lean to (3) Leave things as they are) save if there is important > >> >> >>> things to change/fix (1) which I am not aware of. > >> >> >> > >> >> >> > >> >> >> Sounds good to me. > >> >> >> > >> >> >>> > >> >> >>> On Tue, Nov 20, 2018 at 12:07 AM Lukasz Cwik <[email protected]> > wrote: > >> >> >>> > > >> >> >>> > Sorry for the late reply. > >> >> >>> > > >> >> >>> > On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía < > [email protected]> wrote: > >> >> >>> >> > >> >> >>> >> Some late comments, and my pre excuses if some questions look > silly, > >> >> >>> >> but the last documents were a lot of info that I have not yet > fully > >> >> >>> >> digested. > >> >> >>> >> > >> >> >>> >> I have some questions about the ‘new’ Backlog concept > following a > >> >> >>> >> quick look at the PR > >> >> >>> >> https://github.com/apache/beam/pull/6969/files > >> >> >>> >> > >> >> >>> >> 1. Is the Backlog a specific concept for each IO? Or in other > words: > >> >> >>> >> ByteKeyRestrictionTracker can be used by HBase and Bigtable, > but I am > >> >> >>> >> assuming from what I could understand that the Backlog > implementation > >> >> >>> >> will be data store specific, is this the case? or it can be > in some > >> >> >>> >> case generalized (for example for Filesystems)? > >> >> >>> > > >> >> >>> > > >> >> >>> > The backlog is tied heavily to the restriction tracker > implementation, any data store using the same restriction tracker will > provide the same backlog computation. For example, if HBase/Bigtable use > the ByteKeyRestrictionTracker then they will use the same backlog > calculation. Note that an implementation could subclass a restriction > tracker if the data store could provide additional information. For > example, the default backlog for a ByteKeyRestrictionTracker over > [startKey, endKey) is distance(currentKey, lastKey) where distance is > represented as byte array subtraction (which can be wildly inaccurrate as > the density of data is not well reflected) but if HBase/Bigtable could > provide the number of bytes from current key to last key, a better > representation could be provided. > >> >> >>> > > >> >> >>> > Other common examples of backlogs would be: > >> >> >>> > * files: backlog = length of file - current byte offset > >> >> >>> > * message queues: backlog = number of outstanding messages > >> >> >>> > > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> 2. Since the backlog is a byte[] this means that it is up to > the user > >> >> >>> >> to give it a meaning depending on the situation, is this > correct? Also > >> >> >>> >> since splitRestriction has now the Backlog as an argument, > what do we > >> >> >>> >> expect the person that implements this method in a DoFn to do > ideally > >> >> >>> >> with it? Maybe a more concrete example of how things fit for > >> >> >>> >> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe > also for > >> >> >>> >> the BundleFinalizer concept too). > >> >> >>> > > >> >> >>> > > >> >> >>> > Yes, the restriction tracker/restriction/SplittableDoFn must > give the byte[] a meaning. This can have any meaning but we would like that > the backlog byte[] representation to be lexicograhically comparable (when > viewing the byte[] in big endian format and prefixes are smaller (e.g. 001 > is smaller then 0010) and preferably a linear representation. Note that all > restriction trackers of the same type should use the same "space" so that > backlogs are comparable across multiple restriction tracker instances. > >> >> >>> > > >> >> >>> > The backlog when provided to splitRestriction should be used > to subdivide the restriction into smaller restrictions where each would > have the backlog if processed (except for potentially the last). > >> >> >>> > > >> >> >>> > A concrete example would be to represent the remaining bytes > to process in a file as a 64 bit big endian integer, lets say that is > 500MiB (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 > 01000000 (note that the trailing zeros are optional and doesn't impact the > calculation). The runner could notice that processing the restriction will > take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the > bits over by 4 and asks to split using backlog 00000000 00000000 00000000 > 00000000 00000001 11110100. The SDK is able to convert this request back > into 32768000 bytes and returns 16 restrictions. Another example would be > for a message queue where we have 10000 messages on the queue remaining so > the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000 > 00100111 00010000 when represented as a 64 bit unsigned big endian integer. > The runner could ask the SDK to split using a 1/8th backlog of 00000000 > 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the > SDK would break out into 8 restrictions, the first 7 responsible for > reading 1250 messages and stopping while the last restriction would read > 1250 messages and then continue to read anything else that has been > enqueued. > >> >> >>> > > >> >> >>> > Bundle finalization is unrelated to backlogs but is needed > since there is a class of data stores which need acknowledgement that says > I have successfully received your data and am now responsible for it such > as acking a message from a message queue. > >> >> >>> > > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> 3. By default all Restrictions are assumed to be unbounded > but there > >> >> >>> >> is this new Restrictions.IsBounded method, can’t this > behavior be > >> >> >>> >> inferred (adapted) from the DoFn UnboundedPerElement/Bounded > >> >> >>> >> annotation or are these independent concepts? > >> >> >>> > > >> >> >>> > > >> >> >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline > construction time what type of PCollection we will be creating since we may > have a bounded PCollection goto an UnboundedPerElement DoFn and that will > produce an unbounded PCollection and similarly we could have an unbounded > PCollection goto a BoundedPerElement DoFn and that will produce an > unbounded PCollection. Restrictions.IsBounded is used during pipeline > execution to inform the runner whether a restriction being returned is > bounded or not since unbounded restrictions can return bounded restrictions > during splitting. So in the above example using the message queue, the > first 7 restrictions that only read 1250 messages would be marked with the > Restrictions.IsBounded interface while the last one would not be. This > could also be a method on restrictions such as "IsBounded isBounded()" on > PCollections. > >> >> >>> > > >> >> >>> >> Extra unrelated comment: > >> >> >>> >> Since SDF is still @Experimental we should probably rename > >> >> >>> >> OffsetRangeTracker and ByteKeyRangeTracker into the > RestrictionTracker > >> >> >>> >> suffix (I don’t know why they share the RangeTracker suffix > for the > >> >> >>> >> new trackers, WDYT? > >> >> >>> > > >> >> >>> > > >> >> >>> > Agree, will perform in a follow-up PR. > >> >> >>> > > >> >> >>> >> > >> >> >>> >> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <[email protected]> > wrote: > >> >> >>> >> > > >> >> >>> >> > > >> >> >>> >> > > >> >> >>> >> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw < > [email protected]> wrote: > >> >> >>> >> >> > >> >> >>> >> >> I think that not returning the users specific subclass > should be fine. > >> >> >>> >> >> Does the removal of markDone imply that the consumer > always knows a > >> >> >>> >> >> "final" key to claim on any given restriction? > >> >> >>> >> > > >> >> >>> >> > > >> >> >>> >> > Yes, each restriction needs to support claiming a "final" > key that would make the restriction "done". In the BigTable/HBase case it > is the empty key "", for files it can be a file offset beyond the end of > the file. Generally, restriction trackers written by SDF authors could also > take an instance of an object that they can compare instance equality > against for a final key. Alternatively we could allow restriction trackers > to implement markDone() but would need the SDK have knowledge of the method > by having the RestrictionTracker implement interface, extend abstract base > class, or reflectively found so that we would be able to wrap it to provide > synchronization guarantees. I had toyed with the idea of using something > like the ProxyInvocationHandler that backs PipelineOptions to be able to > provide a modified version of the users instance that had the appropriate > synchronization guarantees but couldn't get it to work. > >> >> >>> >> > > >> >> >>> >> >> > >> >> >>> >> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik < > [email protected]> wrote: > >> >> >>> >> >> > > >> >> >>> >> >> > I have started to work on how to change the user facing > API within the Java SDK to support splitting/checkpointing[1], backlog > reporting[2] and bundle finalization[3]. > >> >> >>> >> >> > > >> >> >>> >> >> > I have this PR[4] which contains minimal interface/type > definitions to convey how the API surface would change with these 4 changes: > >> >> >>> >> >> > 1) Exposes the ability for @SplitRestriction to take a > backlog suggestion on how to perform splitting and for how many > restrictions should be returned. > >> >> >>> >> >> > 2) Adds the ability for RestrictionTrackers to report > backlog > >> >> >>> >> >> > 3) Updates @ProcessElement to be required to take a > generic RestrictionTracker instead of the users own restriction tracker > type. > >> >> >>> >> >> > 4) Adds the ability for > @StartBundle/@ProcessElement/@FinishBundle to register a callback that is > invoked after bundle finalization. > >> >> >>> >> >> > > >> >> >>> >> >> > The details are in the javadoc comments as to how I > would expect the contract to play out. > >> >> >>> >> >> > Feel free to comment on the ML/PR around the contract > and after the feedback is received/digested/implemented, I would like to > get the changes submitted so that work can start towards providing an > implementation in the Java SDK, Python SDK, and Go SDK and the shared > runner portability library. > >> >> >>> >> >> > > >> >> >>> >> >> > I would like to call out special attention to 3 since > with this change it will enable us to remove the synchronization > requirement for users as we will wrap the underlying restriction tracker > allowing us to add appropriate synchronization as needed and also to watch > any calls that pass through the object such as the claim calls. I also > believe this prevents people from writing RestrictionTrackers where the > contract of tryClaim is subverted since markDone is outside the purview of > tryClaim as in ByteKeyRangeTracker[5]. > >> >> >>> >> >> > > >> >> >>> >> >> > 1: > https://s.apache.org/beam-checkpoint-and-split-bundles > >> >> >>> >> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting > >> >> >>> >> >> > 3: https://s.apache.org/beam-finalizing-bundles > >> >> >>> >> >> > 4: https://github.com/apache/beam/pull/6969 > >> >> >>> >> >> > 5: https://github.com/apache/beam/pull/6949 >
