I still have outstanding questions (above) about

1) Why we need arbitrary precision for backlog, instead of just using
a (much simpler) double.
2) Whether its's worth passing backlog back to split requests, rather
than (again) a double representing "portion of current remaining"
which may change over time. (The most common split request is into
even portions, and specifically half, which can't accurately be
requested from a stale backlog.)

There are also some questions about returning multiple remainders, and
how that relates to/overlaps with the initial splitting, but those can
probably be deferred.

On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik <lc...@google.com> wrote:
>
> I updated the PR addressing the last of Scott's comments and also migrated to 
> use an integral fraction as Robert had recommended by using approach A for 
> the proto representation and BigDecimal within the Java SDK:
> A:
> // Represents a non-negative decimal number: unscaled_value * 10^(-scale)
> message Decimal {
>   // Represents the unscaled value as a big endian unlimited precision 
> non-negative integer.
>   bytes unscaled_value = 1;
>   // Represents the scale
>   uint32 scale = 2;
> }
>
> Ismael, I would like to defer the changes to improve the ByteBuddy 
> DoFnInvoker since that is parallelizable work and have filed BEAM-6142.
>
> I don't believe there are any other outstanding changes and would like to get 
> the PR merged so that people can start working on implementing support for 
> backlog reporting and splitting within the Java SDK harness, improving the 
> ByteBuddy DoFnInvoker, exposing the shared runner library parts, and 
> integrating this into ULR, Flink, Dataflow, ...
>
> On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>
>>
>> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>
>>> > Bundle finalization is unrelated to backlogs but is needed since there is 
>>> > a class of data stores which need acknowledgement that says I have 
>>> > successfully received your data and am now responsible for it such as 
>>> > acking a message from a message queue.
>>>
>>> Currently ack is done by IOs as part of checkpointing. How this will
>>> be different? Can you please clarify how should be done in this case,
>>> or is this totally independent?
>>
>>
>> The flow for finalization and checkpointing is similar:
>> Checkpointing:
>> 1) Process a bundle
>> 2) Checkpoint bundle containing acks that need to be done
>> 3) When checkpoint resumes, acknowledge messages
>>
>> Finalization:
>> 1) Process a bundle
>> 2) Request bundle finalization when bundle completes
>> 3) SDK is asked to finalize bundle
>>
>> The difference between the two is that bundle finalization always goes back 
>> to the same machine instance that processed the bundle while checkpointing 
>> can be scheduled on another machine. Many message queue like systems expose 
>> clients which store in memory state and can't ack from another machine. You 
>> could solve the problem with checkpointing but would require each machine to 
>> be able to tell another machine that it got a checkpoint with acks that it 
>> is responsible for but this won't work everywhere and isn't as clean.
>>
>>>
>>> > UnboundedPerElement/BoundedPerElement tells us during pipeline 
>>> > construction time what type of PCollection we will be creating since we 
>>> > may have a bounded PCollection goto an UnboundedPerElement DoFn and that 
>>> > will produce an unbounded PCollection and similarly we could have an 
>>> > unbounded PCollection goto a BoundedPerElement DoFn and that will produce 
>>> > an unbounded PCollection. Restrictions.IsBounded is used during pipeline 
>>> > execution to inform the runner whether a restriction being returned is 
>>> > bounded or not since unbounded restrictions can return bounded 
>>> > restrictions during splitting. So in the above example using the message 
>>> > queue, the first 7 restrictions that only read 1250 messages would be 
>>> > marked with the Restrictions.IsBounded interface while the last one would 
>>> > not be. This could also be a method on restrictions such as "IsBounded 
>>> > isBounded()" on Pcollections.
>>>
>>> Thanks for the explanation about Restrictions.IsBounded, since this is
>>> information for the runner What is the runner expected to do
>>> differently when IsUnbounded? (I assume that IsBounded is the default
>>> behavior and nothing changes).
>>
>>
>> Knowing whether a restriction is bounded or unbounded is important, one 
>> example use case would be for the limited depth splitting proposal 
>> (https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv)
>>  since you want to keep the unbounded restrictions at level 0 and only pass 
>> the bounded restrictions to the other levels. The reasoning behind this is 
>> that you don't want to end up in a state where all your unbounded 
>> restrictions are at the highest level preventing you from splitting any 
>> further.
>>
>>>
>>> > Note that this does bring up the question of whether SDKs should expose 
>>> > coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist which 
>>> > would cover a good number of scenarios described above. This coder 
>>> > doesn't have to be understood by the runner nor does it have to be part 
>>> > of the portability APIs (either Runner of Fn API). WDYT?
>>>
>>> Yes we may need a Coder effectively for both sides, only thing I don’t
>>> like is external impact in the API. I mean it is not too complex, but
>>> adds some extras to support things that are ‘rarely’ changed.
>>
>>
>> Based upon Robert's suggestion above to swap to use a integral floating 
>> point number and even without Robert's suggestion this won't work. The idea 
>> was that a coder would help convert the byte[] backlog representation 
>> to/from a type the user wants but the issue is that the Runner may give any 
>> arbitrary byte[] backlog to the SDK during splitting and this coder would 
>> need to be able to handle it.
>>
>>>
>>> > Ismael, I looked at the API around ByteKeyRangeTracker and 
>>> > OffsetRangeTracker figured out that the classes are named as such because 
>>> > they are trackers for the OffsetRange and ByteKeyRange classes. Some 
>>> > options are to:
>>> > 1) Copy the ByteKeyRange and call it ByteKeyRestriction and similarly 
>>> > copy OffsetRange and call it OffsetRestriction. This would allow us to 
>>> > name the trackers ByteKeyRestrictionTracker and OffsetRestrictionTracker. 
>>> > Note that we can't rename because that would be a backwards incompatible 
>>> > change for existing users of ByteKeyRange/OffsetRange. This would allow 
>>> > us to add methods relevant to SDF and remove methods that aren't needed.
>>> > 2) Rename ByteKeyRangeTracker to ByteKeyRangeRestrictionTracker and 
>>> > OffsetRangeTracker to OffsetRangeRestrictionTracker. Not really liking 
>>> > this option.
>>> > 3) Leave things as they are.
>>>
>>> For the RangeTracker vs RestrictionTracker discussion I will probably
>>> lean to (3) Leave things as they are) save if there is important
>>> things to change/fix (1) which I am not aware of.
>>
>>
>> Sounds good to me.
>>
>>>
>>> On Tue, Nov 20, 2018 at 12:07 AM Lukasz Cwik <lc...@google.com> wrote:
>>> >
>>> > Sorry for the late reply.
>>> >
>>> > On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>> >>
>>> >> Some late comments, and my pre excuses if some questions look silly,
>>> >> but the last documents were a lot of info that I have not yet fully
>>> >> digested.
>>> >>
>>> >> I have some questions about the ‘new’ Backlog concept following a
>>> >> quick look at the PR
>>> >> https://github.com/apache/beam/pull/6969/files
>>> >>
>>> >> 1. Is the Backlog a specific concept for each IO? Or in other words:
>>> >> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am
>>> >> assuming from what I could understand that the Backlog implementation
>>> >> will be data store specific, is this the case? or it can be in some
>>> >> case generalized (for example for Filesystems)?
>>> >
>>> >
>>> > The backlog is tied heavily to the restriction tracker implementation, 
>>> > any data store using the same restriction tracker will provide the same 
>>> > backlog computation. For example, if HBase/Bigtable use the 
>>> > ByteKeyRestrictionTracker then they will use the same backlog 
>>> > calculation. Note that an implementation could subclass a restriction 
>>> > tracker if the data store could provide additional information. For 
>>> > example, the default backlog for a ByteKeyRestrictionTracker over 
>>> > [startKey, endKey) is distance(currentKey, lastKey) where distance is 
>>> > represented as byte array subtraction (which can be wildly inaccurrate as 
>>> > the density of data is not well reflected) but if HBase/Bigtable could 
>>> > provide the number of bytes from current key to last key, a better 
>>> > representation could be provided.
>>> >
>>> > Other common examples of backlogs would be:
>>> > * files: backlog = length of file - current byte offset
>>> > * message queues: backlog = number of outstanding messages
>>> >
>>> >>
>>> >>
>>> >> 2. Since the backlog is a byte[] this means that it is up to the user
>>> >> to give it a meaning depending on the situation, is this correct? Also
>>> >> since splitRestriction has now the Backlog as an argument, what do we
>>> >> expect the person that implements this method in a DoFn to do ideally
>>> >> with it? Maybe a more concrete example of how things fit for
>>> >> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for
>>> >> the BundleFinalizer concept too).
>>> >
>>> >
>>> > Yes, the restriction tracker/restriction/SplittableDoFn must give the 
>>> > byte[] a meaning. This can have any meaning but we would like that the 
>>> > backlog byte[] representation to be lexicograhically comparable (when 
>>> > viewing the byte[] in big endian format and prefixes are smaller (e.g. 
>>> > 001 is smaller then 0010) and preferably a linear representation. Note 
>>> > that all restriction trackers of the same type should use the same 
>>> > "space" so that backlogs are comparable across multiple restriction 
>>> > tracker instances.
>>> >
>>> > The backlog when provided to splitRestriction should be used to subdivide 
>>> > the restriction into smaller restrictions where each would have the 
>>> > backlog if processed (except for potentially the last).
>>> >
>>> > A concrete example would be to represent the remaining bytes to process 
>>> > in a file as a 64 bit big endian integer, lets say that is 500MiB 
>>> > (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 
>>> > 01000000 (note that the trailing zeros are optional and doesn't impact 
>>> > the calculation). The runner could notice that processing the restriction 
>>> > will take 10 hrs, so it asks the SDF to split at 1/16 segments by 
>>> > shifting the bits over by 4 and asks to split using backlog 00000000 
>>> > 00000000 00000000 00000000 00000001 11110100. The SDK is able to convert 
>>> > this request back into 32768000 bytes and returns 16 restrictions. 
>>> > Another example would be for a message queue where we have 10000 messages 
>>> > on the queue remaining so the backlog would be 00000000 00000000 00000000 
>>> > 00000000 00000000 00000000 00100111 00010000 when represented as a 64 bit 
>>> > unsigned big endian integer. The runner could ask the SDK to split using 
>>> > a 1/8th backlog of 00000000 00000000 00000000 00000000 00000000 00000000 
>>> > 00000100 11100010 which the SDK would break out into 8 restrictions, the 
>>> > first 7 responsible for reading 1250 messages and stopping while the last 
>>> > restriction would read 1250 messages and then continue to read anything 
>>> > else that has been enqueued.
>>> >
>>> > Bundle finalization is unrelated to backlogs but is needed since there is 
>>> > a class of data stores which need acknowledgement that says I have 
>>> > successfully received your data and am now responsible for it such as 
>>> > acking a message from a message queue.
>>> >
>>> >>
>>> >>
>>> >> 3. By default all Restrictions are assumed to be unbounded but there
>>> >> is this new Restrictions.IsBounded method, can’t this behavior be
>>> >> inferred (adapted) from the DoFn UnboundedPerElement/Bounded
>>> >> annotation or are these independent concepts?
>>> >
>>> >
>>> > UnboundedPerElement/BoundedPerElement tells us during pipeline 
>>> > construction time what type of PCollection we will be creating since we 
>>> > may have a bounded PCollection goto an UnboundedPerElement DoFn and that 
>>> > will produce an unbounded PCollection and similarly we could have an 
>>> > unbounded PCollection goto a BoundedPerElement DoFn and that will produce 
>>> > an unbounded PCollection. Restrictions.IsBounded is used during pipeline 
>>> > execution to inform the runner whether a restriction being returned is 
>>> > bounded or not since unbounded restrictions can return bounded 
>>> > restrictions during splitting. So in the above example using the message 
>>> > queue, the first 7 restrictions that only read 1250 messages would be 
>>> > marked with the Restrictions.IsBounded interface while the last one would 
>>> > not be. This could also be a method on restrictions such as "IsBounded 
>>> > isBounded()" on PCollections.
>>> >
>>> >> Extra unrelated comment:
>>> >> Since SDF is still @Experimental we should probably rename
>>> >> OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker
>>> >> suffix (I don’t know why they share the RangeTracker suffix for the
>>> >> new trackers, WDYT?
>>> >
>>> >
>>> > Agree, will perform in a follow-up PR.
>>> >
>>> >>
>>> >> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lc...@google.com> wrote:
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <rober...@google.com> 
>>> >> > wrote:
>>> >> >>
>>> >> >> I think that not returning the users specific subclass should be fine.
>>> >> >> Does the removal of markDone imply that the consumer always knows a
>>> >> >> "final" key to claim on any given restriction?
>>> >> >
>>> >> >
>>> >> > Yes, each restriction needs to support claiming a "final" key that 
>>> >> > would make the restriction "done". In the BigTable/HBase case it is 
>>> >> > the empty key "", for files it can be a file offset beyond the end of 
>>> >> > the file. Generally, restriction trackers written by SDF authors could 
>>> >> > also take an instance of an object that they can compare instance 
>>> >> > equality against for a final key. Alternatively we could allow 
>>> >> > restriction trackers to implement markDone() but would need the SDK 
>>> >> > have knowledge of the method by having the RestrictionTracker 
>>> >> > implement interface, extend abstract base class, or reflectively found 
>>> >> > so that we would be able to wrap it to provide synchronization 
>>> >> > guarantees. I had toyed with the idea of using something like the 
>>> >> > ProxyInvocationHandler that backs PipelineOptions to be able to 
>>> >> > provide a modified version of the users instance that had the 
>>> >> > appropriate synchronization guarantees but couldn't get it to work.
>>> >> >
>>> >> >>
>>> >> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lc...@google.com> wrote:
>>> >> >> >
>>> >> >> > I have started to work on how to change the user facing API within 
>>> >> >> > the Java SDK to support splitting/checkpointing[1], backlog 
>>> >> >> > reporting[2] and bundle finalization[3].
>>> >> >> >
>>> >> >> > I have this PR[4] which contains minimal interface/type definitions 
>>> >> >> > to convey how the API surface would change with these 4 changes:
>>> >> >> > 1) Exposes the ability for @SplitRestriction to take a backlog 
>>> >> >> > suggestion on how to perform splitting and for how many 
>>> >> >> > restrictions should be returned.
>>> >> >> > 2) Adds the ability for RestrictionTrackers to report backlog
>>> >> >> > 3) Updates @ProcessElement to be required to take a generic 
>>> >> >> > RestrictionTracker instead of the users own restriction tracker 
>>> >> >> > type.
>>> >> >> > 4) Adds the ability for @StartBundle/@ProcessElement/@FinishBundle 
>>> >> >> > to register a callback that is invoked after bundle finalization.
>>> >> >> >
>>> >> >> > The details are in the javadoc comments as to how I would expect 
>>> >> >> > the contract to play out.
>>> >> >> > Feel free to comment on the ML/PR around the contract and after the 
>>> >> >> > feedback is received/digested/implemented, I would like to get the 
>>> >> >> > changes submitted so that work can start  towards providing an 
>>> >> >> > implementation in the Java SDK, Python SDK, and Go SDK and the 
>>> >> >> > shared runner portability library.
>>> >> >> >
>>> >> >> > I would like to call out special attention to 3 since with this 
>>> >> >> > change it will enable us to remove the synchronization requirement 
>>> >> >> > for users as we will wrap the underlying restriction tracker 
>>> >> >> > allowing us to add appropriate synchronization as needed and also 
>>> >> >> > to watch any calls that pass through the object such as the claim 
>>> >> >> > calls. I also believe this prevents people from writing 
>>> >> >> > RestrictionTrackers where the contract of tryClaim is subverted 
>>> >> >> > since markDone is outside the purview of tryClaim as in 
>>> >> >> > ByteKeyRangeTracker[5].
>>> >> >> >
>>> >> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles
>>> >> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting
>>> >> >> > 3: https://s.apache.org/beam-finalizing-bundles
>>> >> >> > 4: https://github.com/apache/beam/pull/6969
>>> >> >> > 5: https://github.com/apache/beam/pull/6949

Reply via email to