I also addressed a bunch of PR comments which clarified the contract/expectations as described in my previous e-mail and the splitting/backlog reporting/bundle finalization docs.
On Mon, Nov 19, 2018 at 3:19 PM Lukasz Cwik <[email protected]> wrote: > > > On Mon, Nov 19, 2018 at 3:06 PM Lukasz Cwik <[email protected]> wrote: > >> Sorry for the late reply. >> >> On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <[email protected]> wrote: >> >>> Some late comments, and my pre excuses if some questions look silly, >>> but the last documents were a lot of info that I have not yet fully >>> digested. >>> >>> I have some questions about the ‘new’ Backlog concept following a >>> quick look at the PR >>> https://github.com/apache/beam/pull/6969/files >>> >>> 1. Is the Backlog a specific concept for each IO? Or in other words: >>> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am >>> assuming from what I could understand that the Backlog implementation >>> will be data store specific, is this the case? or it can be in some >>> case generalized (for example for Filesystems)? >>> >> >> The backlog is tied heavily to the restriction tracker implementation, >> any data store using the same restriction tracker will provide the same >> backlog computation. For example, if HBase/Bigtable use the >> ByteKeyRestrictionTracker then they will use the same backlog calculation. >> Note that an implementation could subclass a restriction tracker if the >> data store could provide additional information. For example, the default >> backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is >> distance(currentKey, lastKey) where distance is represented as byte array >> subtraction (which can be wildly inaccurrate as the density of data is not >> well reflected) but if HBase/Bigtable could provide the number of bytes >> from current key to last key, a better representation could be provided. >> >> Other common examples of backlogs would be: >> * files: backlog = length of file - current byte offset >> * message queues: backlog = number of outstanding messages >> >> >>> >>> 2. Since the backlog is a byte[] this means that it is up to the user >>> to give it a meaning depending on the situation, is this correct? Also >>> since splitRestriction has now the Backlog as an argument, what do we >>> expect the person that implements this method in a DoFn to do ideally >>> with it? Maybe a more concrete example of how things fit for >>> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for >>> the BundleFinalizer concept too). >>> >> >> Yes, the restriction tracker/restriction/SplittableDoFn must give the >> byte[] a meaning. This can have any meaning but we would like that the >> backlog byte[] representation to be lexicograhically comparable (when >> viewing the byte[] in big endian format and prefixes are smaller (e.g. 001 >> is smaller then 0010) and preferably a linear representation. Note that all >> restriction trackers of the same type should use the same "space" so that >> backlogs are comparable across multiple restriction tracker instances. >> >> The backlog when provided to splitRestriction should be used to subdivide >> the restriction into smaller restrictions where each would have the backlog >> if processed (except for potentially the last). >> >> A concrete example would be to represent the remaining bytes to process >> in a file as a 64 bit big endian integer, lets say that is 500MiB >> (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 01000000 >> (note that the trailing zeros are optional and doesn't impact the >> calculation). The runner could notice that processing the restriction will >> take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the >> bits over by 4 and asks to split using backlog 00000000 00000000 00000000 >> 00000000 00000001 11110100. The SDK is able to convert this request back >> into 32768000 bytes and returns 16 restrictions. Another example would be >> for a message queue where we have 10000 messages on the queue remaining so >> the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000 >> 00100111 00010000 when represented as a 64 bit unsigned big endian integer. >> The runner could ask the SDK to split using a 1/8th backlog of 00000000 >> 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the >> SDK would break out into 8 restrictions, the first 7 responsible for >> reading 1250 messages and stopping while the last restriction would read >> 1250 messages and then continue to read anything else that has been >> enqueued. >> >> Bundle finalization is unrelated to backlogs but is needed since there is >> a class of data stores which need acknowledgement that says I have >> successfully received your data and am now responsible for it such as >> acking a message from a message queue. >> > > Note that this does bring up the question of whether SDKs should expose > coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist which > would cover a good number of scenarios described above. This coder doesn't > have to be understood by the runner nor does it have to be part of the > portability APIs (either Runner of Fn API). WDYT? > > >> >>> 3. By default all Restrictions are assumed to be unbounded but there >>> is this new Restrictions.IsBounded method, can’t this behavior be >>> inferred (adapted) from the DoFn UnboundedPerElement/Bounded >>> annotation or are these independent concepts? >>> >> >> UnboundedPerElement/BoundedPerElement tells us during pipeline >> construction time what type of PCollection we will be creating since we may >> have a bounded PCollection goto an UnboundedPerElement DoFn and that will >> produce an unbounded PCollection and similarly we could have an unbounded >> PCollection goto a BoundedPerElement DoFn and that will produce an >> unbounded PCollection. Restrictions.IsBounded is used during pipeline >> execution to inform the runner whether a restriction being returned is >> bounded or not since unbounded restrictions can return bounded restrictions >> during splitting. So in the above example using the message queue, the >> first 7 restrictions that only read 1250 messages would be marked with the >> Restrictions.IsBounded interface while the last one would not be. This >> could also be a method on restrictions such as "IsBounded isBounded()" on >> PCollections. >> >> Extra unrelated comment: >>> Since SDF is still @Experimental we should probably rename >>> OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker >>> suffix (I don’t know why they share the RangeTracker suffix for the >>> new trackers, WDYT? >>> >> >> Agree, will perform in a follow-up PR. >> >> >>> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <[email protected]> wrote: >>> > >>> > >>> > >>> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >> >>> >> I think that not returning the users specific subclass should be fine. >>> >> Does the removal of markDone imply that the consumer always knows a >>> >> "final" key to claim on any given restriction? >>> > >>> > >>> > Yes, each restriction needs to support claiming a "final" key that >>> would make the restriction "done". In the BigTable/HBase case it is the >>> empty key "", for files it can be a file offset beyond the end of the file. >>> Generally, restriction trackers written by SDF authors could also take an >>> instance of an object that they can compare instance equality against for a >>> final key. Alternatively we could allow restriction trackers to implement >>> markDone() but would need the SDK have knowledge of the method by having >>> the RestrictionTracker implement interface, extend abstract base class, or >>> reflectively found so that we would be able to wrap it to provide >>> synchronization guarantees. I had toyed with the idea of using something >>> like the ProxyInvocationHandler that backs PipelineOptions to be able to >>> provide a modified version of the users instance that had the appropriate >>> synchronization guarantees but couldn't get it to work. >>> > >>> >> >>> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <[email protected]> wrote: >>> >> > >>> >> > I have started to work on how to change the user facing API within >>> the Java SDK to support splitting/checkpointing[1], backlog reporting[2] >>> and bundle finalization[3]. >>> >> > >>> >> > I have this PR[4] which contains minimal interface/type definitions >>> to convey how the API surface would change with these 4 changes: >>> >> > 1) Exposes the ability for @SplitRestriction to take a backlog >>> suggestion on how to perform splitting and for how many restrictions should >>> be returned. >>> >> > 2) Adds the ability for RestrictionTrackers to report backlog >>> >> > 3) Updates @ProcessElement to be required to take a generic >>> RestrictionTracker instead of the users own restriction tracker type. >>> >> > 4) Adds the ability for @StartBundle/@ProcessElement/@FinishBundle >>> to register a callback that is invoked after bundle finalization. >>> >> > >>> >> > The details are in the javadoc comments as to how I would expect >>> the contract to play out. >>> >> > Feel free to comment on the ML/PR around the contract and after the >>> feedback is received/digested/implemented, I would like to get the changes >>> submitted so that work can start towards providing an implementation in >>> the Java SDK, Python SDK, and Go SDK and the shared runner portability >>> library. >>> >> > >>> >> > I would like to call out special attention to 3 since with this >>> change it will enable us to remove the synchronization requirement for >>> users as we will wrap the underlying restriction tracker allowing us to add >>> appropriate synchronization as needed and also to watch any calls that pass >>> through the object such as the claim calls. I also believe this prevents >>> people from writing RestrictionTrackers where the contract of tryClaim is >>> subverted since markDone is outside the purview of tryClaim as in >>> ByteKeyRangeTracker[5]. >>> >> > >>> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles >>> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting >>> >> > 3: https://s.apache.org/beam-finalizing-bundles >>> >> > 4: https://github.com/apache/beam/pull/6969 >>> >> > 5: https://github.com/apache/beam/pull/6949 >>> >>
