Note I have merged the PR but will continue to iterate based upon the
feedback provided in this thread as it has been quite useful.

On Fri, Nov 30, 2018 at 9:37 AM Lukasz Cwik <lc...@google.com> wrote:

> Sorry, for some reason I thought I had answered these.
>
> On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> I still have outstanding questions (above) about
>>
>> 1) Why we need arbitrary precision for backlog, instead of just using
>> a (much simpler) double.
>>
>
> Double lacks the precision for reporting backlogs for byte key ranges
> (HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and with a
> large number of keys with a really long common prefix such as
> "aaaaaaaaaaaaaaaaaaaaaaaaaab" and "aaaaaaaaaaaaaaaaaaaaaaaaaac", ... leads
> to the backlog not changing even though we are making progress through the
> key space. This also prevents splitting within such an area since the
> double can't provide that necessary precision (without multiple rounds of
> splitting which adds complexity).
>
>
>> 2) Whether its's worth passing backlog back to split requests, rather
>> than (again) a double representing "portion of current remaining"
>> which may change over time. (The most common split request is into
>> even portions, and specifically half, which can't accurately be
>> requested from a stale backlog.)
>>
>
> I see two scenarios here:
> * the fraction is exposed to the SDF author and then the SDF author needs
> to map from their restriciton space to backlog and also map fractions onto
> their restriction space meaning that they are required to write mappings
> between three different models.
> * the fraction is not exposed to the SDF author and the framework code
> multiplies the fraction against the backlog and provides the backlog to the
> user (this solves the backlog skew issue but still has the limited
> precision issue).
>
> I believe it is easier for an SDF author to write a two way mapping from
> backlog to their position space then to write two different types of
> mappings. For example, when a person is reading a file that has 100 bytes
> to process and is asked to split at 60.3%, they have to map 60.3% onto 100
> bytes figuring out that they are responsible for 60.3 bytes in which they
> round down to 60 bytes. In the scenario where the runner provides the
> backlog, 60.3 would have been sent across and the SDF author would only
> need to perform rounding.
>
>
>> There are also some questions about returning multiple remainders, and
>> how that relates to/overlaps with the initial splitting, but those can
>> probably be deferred.
>>
>
> Agree.
>
>
>> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > I updated the PR addressing the last of Scott's comments and also
>> migrated to use an integral fraction as Robert had recommended by using
>> approach A for the proto representation and BigDecimal within the Java SDK:
>> > A:
>> > // Represents a non-negative decimal number: unscaled_value *
>> 10^(-scale)
>> > message Decimal {
>> >   // Represents the unscaled value as a big endian unlimited precision
>> non-negative integer.
>> >   bytes unscaled_value = 1;
>> >   // Represents the scale
>> >   uint32 scale = 2;
>> > }
>> >
>> > Ismael, I would like to defer the changes to improve the ByteBuddy
>> DoFnInvoker since that is parallelizable work and have filed BEAM-6142.
>> >
>> > I don't believe there are any other outstanding changes and would like
>> to get the PR merged so that people can start working on implementing
>> support for backlog reporting and splitting within the Java SDK harness,
>> improving the ByteBuddy DoFnInvoker, exposing the shared runner library
>> parts, and integrating this into ULR, Flink, Dataflow, ...
>> >
>> > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik <lc...@google.com> wrote:
>> >>
>> >>
>> >>
>> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía <ieme...@gmail.com>
>> wrote:
>> >>>
>> >>> > Bundle finalization is unrelated to backlogs but is needed since
>> there is a class of data stores which need acknowledgement that says I have
>> successfully received your data and am now responsible for it such as
>> acking a message from a message queue.
>> >>>
>> >>> Currently ack is done by IOs as part of checkpointing. How this will
>> >>> be different? Can you please clarify how should be done in this case,
>> >>> or is this totally independent?
>> >>
>> >>
>> >> The flow for finalization and checkpointing is similar:
>> >> Checkpointing:
>> >> 1) Process a bundle
>> >> 2) Checkpoint bundle containing acks that need to be done
>> >> 3) When checkpoint resumes, acknowledge messages
>> >>
>> >> Finalization:
>> >> 1) Process a bundle
>> >> 2) Request bundle finalization when bundle completes
>> >> 3) SDK is asked to finalize bundle
>> >>
>> >> The difference between the two is that bundle finalization always goes
>> back to the same machine instance that processed the bundle while
>> checkpointing can be scheduled on another machine. Many message queue like
>> systems expose clients which store in memory state and can't ack from
>> another machine. You could solve the problem with checkpointing but would
>> require each machine to be able to tell another machine that it got a
>> checkpoint with acks that it is responsible for but this won't work
>> everywhere and isn't as clean.
>> >>
>> >>>
>> >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline
>> construction time what type of PCollection we will be creating since we may
>> have a bounded PCollection goto an UnboundedPerElement DoFn and that will
>> produce an unbounded PCollection and similarly we could have an unbounded
>> PCollection goto a BoundedPerElement DoFn and that will produce an
>> unbounded PCollection. Restrictions.IsBounded is used during pipeline
>> execution to inform the runner whether a restriction being returned is
>> bounded or not since unbounded restrictions can return bounded restrictions
>> during splitting. So in the above example using the message queue, the
>> first 7 restrictions that only read 1250 messages would be marked with the
>> Restrictions.IsBounded interface while the last one would not be. This
>> could also be a method on restrictions such as "IsBounded isBounded()" on
>> Pcollections.
>> >>>
>> >>> Thanks for the explanation about Restrictions.IsBounded, since this is
>> >>> information for the runner What is the runner expected to do
>> >>> differently when IsUnbounded? (I assume that IsBounded is the default
>> >>> behavior and nothing changes).
>> >>
>> >>
>> >> Knowing whether a restriction is bounded or unbounded is important,
>> one example use case would be for the limited depth splitting proposal (
>> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv)
>> since you want to keep the unbounded restrictions at level 0 and only pass
>> the bounded restrictions to the other levels. The reasoning behind this is
>> that you don't want to end up in a state where all your unbounded
>> restrictions are at the highest level preventing you from splitting any
>> further.
>> >>
>> >>>
>> >>> > Note that this does bring up the question of whether SDKs should
>> expose coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist
>> which would cover a good number of scenarios described above. This coder
>> doesn't have to be understood by the runner nor does it have to be part of
>> the portability APIs (either Runner of Fn API). WDYT?
>> >>>
>> >>> Yes we may need a Coder effectively for both sides, only thing I don’t
>> >>> like is external impact in the API. I mean it is not too complex, but
>> >>> adds some extras to support things that are ‘rarely’ changed.
>> >>
>> >>
>> >> Based upon Robert's suggestion above to swap to use a integral
>> floating point number and even without Robert's suggestion this won't work.
>> The idea was that a coder would help convert the byte[] backlog
>> representation to/from a type the user wants but the issue is that the
>> Runner may give any arbitrary byte[] backlog to the SDK during splitting
>> and this coder would need to be able to handle it.
>> >>
>> >>>
>> >>> > Ismael, I looked at the API around ByteKeyRangeTracker and
>> OffsetRangeTracker figured out that the classes are named as such because
>> they are trackers for the OffsetRange and ByteKeyRange classes. Some
>> options are to:
>> >>> > 1) Copy the ByteKeyRange and call it ByteKeyRestriction and
>> similarly copy OffsetRange and call it OffsetRestriction. This would allow
>> us to name the trackers ByteKeyRestrictionTracker and
>> OffsetRestrictionTracker. Note that we can't rename because that would be a
>> backwards incompatible change for existing users of
>> ByteKeyRange/OffsetRange. This would allow us to add methods relevant to
>> SDF and remove methods that aren't needed.
>> >>> > 2) Rename ByteKeyRangeTracker to ByteKeyRangeRestrictionTracker and
>> OffsetRangeTracker to OffsetRangeRestrictionTracker. Not really liking this
>> option.
>> >>> > 3) Leave things as they are.
>> >>>
>> >>> For the RangeTracker vs RestrictionTracker discussion I will probably
>> >>> lean to (3) Leave things as they are) save if there is important
>> >>> things to change/fix (1) which I am not aware of.
>> >>
>> >>
>> >> Sounds good to me.
>> >>
>> >>>
>> >>> On Tue, Nov 20, 2018 at 12:07 AM Lukasz Cwik <lc...@google.com>
>> wrote:
>> >>> >
>> >>> > Sorry for the late reply.
>> >>> >
>> >>> > On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <ieme...@gmail.com>
>> wrote:
>> >>> >>
>> >>> >> Some late comments, and my pre excuses if some questions look
>> silly,
>> >>> >> but the last documents were a lot of info that I have not yet fully
>> >>> >> digested.
>> >>> >>
>> >>> >> I have some questions about the ‘new’ Backlog concept following a
>> >>> >> quick look at the PR
>> >>> >> https://github.com/apache/beam/pull/6969/files
>> >>> >>
>> >>> >> 1. Is the Backlog a specific concept for each IO? Or in other
>> words:
>> >>> >> ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I
>> am
>> >>> >> assuming from what I could understand that the Backlog
>> implementation
>> >>> >> will be data store specific, is this the case? or it can be in some
>> >>> >> case generalized (for example for Filesystems)?
>> >>> >
>> >>> >
>> >>> > The backlog is tied heavily to the restriction tracker
>> implementation, any data store using the same restriction tracker will
>> provide the same backlog computation. For example, if HBase/Bigtable use
>> the ByteKeyRestrictionTracker then they will use the same backlog
>> calculation. Note that an implementation could subclass a restriction
>> tracker if the data store could provide additional information. For
>> example, the default backlog for a ByteKeyRestrictionTracker over
>> [startKey, endKey) is distance(currentKey, lastKey) where distance is
>> represented as byte array subtraction (which can be wildly inaccurrate as
>> the density of data is not well reflected) but if HBase/Bigtable could
>> provide the number of bytes from current key to last key, a better
>> representation could be provided.
>> >>> >
>> >>> > Other common examples of backlogs would be:
>> >>> > * files: backlog = length of file - current byte offset
>> >>> > * message queues: backlog = number of outstanding messages
>> >>> >
>> >>> >>
>> >>> >>
>> >>> >> 2. Since the backlog is a byte[] this means that it is up to the
>> user
>> >>> >> to give it a meaning depending on the situation, is this correct?
>> Also
>> >>> >> since splitRestriction has now the Backlog as an argument, what do
>> we
>> >>> >> expect the person that implements this method in a DoFn to do
>> ideally
>> >>> >> with it? Maybe a more concrete example of how things fit for
>> >>> >> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also
>> for
>> >>> >> the BundleFinalizer concept too).
>> >>> >
>> >>> >
>> >>> > Yes, the restriction tracker/restriction/SplittableDoFn must give
>> the byte[] a meaning. This can have any meaning but we would like that the
>> backlog byte[] representation to be lexicograhically comparable (when
>> viewing the byte[] in big endian format and prefixes are smaller (e.g. 001
>> is smaller then 0010) and preferably a linear representation. Note that all
>> restriction trackers of the same type should use the same "space" so that
>> backlogs are comparable across multiple restriction tracker instances.
>> >>> >
>> >>> > The backlog when provided to splitRestriction should be used to
>> subdivide the restriction into smaller restrictions where each would have
>> the backlog if processed (except for potentially the last).
>> >>> >
>> >>> > A concrete example would be to represent the remaining bytes to
>> process in a file as a 64 bit big endian integer, lets say that is 500MiB
>> (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 01000000
>> (note that the trailing zeros are optional and doesn't impact the
>> calculation). The runner could notice that processing the restriction will
>> take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the
>> bits over by 4 and asks to split using backlog 00000000 00000000 00000000
>> 00000000 00000001 11110100. The SDK is able to convert this request back
>> into 32768000 bytes and returns 16 restrictions. Another example would be
>> for a message queue where we have 10000 messages on the queue remaining so
>> the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000
>> 00100111 00010000 when represented as a 64 bit unsigned big endian integer.
>> The runner could ask the SDK to split using a 1/8th backlog of 00000000
>> 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the
>> SDK would break out into 8 restrictions, the first 7 responsible for
>> reading 1250 messages and stopping while the last restriction would read
>> 1250 messages and then continue to read anything else that has been
>> enqueued.
>> >>> >
>> >>> > Bundle finalization is unrelated to backlogs but is needed since
>> there is a class of data stores which need acknowledgement that says I have
>> successfully received your data and am now responsible for it such as
>> acking a message from a message queue.
>> >>> >
>> >>> >>
>> >>> >>
>> >>> >> 3. By default all Restrictions are assumed to be unbounded but
>> there
>> >>> >> is this new Restrictions.IsBounded method, can’t this behavior be
>> >>> >> inferred (adapted) from the DoFn UnboundedPerElement/Bounded
>> >>> >> annotation or are these independent concepts?
>> >>> >
>> >>> >
>> >>> > UnboundedPerElement/BoundedPerElement tells us during pipeline
>> construction time what type of PCollection we will be creating since we may
>> have a bounded PCollection goto an UnboundedPerElement DoFn and that will
>> produce an unbounded PCollection and similarly we could have an unbounded
>> PCollection goto a BoundedPerElement DoFn and that will produce an
>> unbounded PCollection. Restrictions.IsBounded is used during pipeline
>> execution to inform the runner whether a restriction being returned is
>> bounded or not since unbounded restrictions can return bounded restrictions
>> during splitting. So in the above example using the message queue, the
>> first 7 restrictions that only read 1250 messages would be marked with the
>> Restrictions.IsBounded interface while the last one would not be. This
>> could also be a method on restrictions such as "IsBounded isBounded()" on
>> PCollections.
>> >>> >
>> >>> >> Extra unrelated comment:
>> >>> >> Since SDF is still @Experimental we should probably rename
>> >>> >> OffsetRangeTracker and ByteKeyRangeTracker into the
>> RestrictionTracker
>> >>> >> suffix (I don’t know why they share the RangeTracker suffix for the
>> >>> >> new trackers, WDYT?
>> >>> >
>> >>> >
>> >>> > Agree, will perform in a follow-up PR.
>> >>> >
>> >>> >>
>> >>> >> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lc...@google.com>
>> wrote:
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> >>> >> >>
>> >>> >> >> I think that not returning the users specific subclass should
>> be fine.
>> >>> >> >> Does the removal of markDone imply that the consumer always
>> knows a
>> >>> >> >> "final" key to claim on any given restriction?
>> >>> >> >
>> >>> >> >
>> >>> >> > Yes, each restriction needs to support claiming a "final" key
>> that would make the restriction "done". In the BigTable/HBase case it is
>> the empty key "", for files it can be a file offset beyond the end of the
>> file. Generally, restriction trackers written by SDF authors could also
>> take an instance of an object that they can compare instance equality
>> against for a final key. Alternatively we could allow restriction trackers
>> to implement markDone() but would need the SDK have knowledge of the method
>> by having the RestrictionTracker implement interface, extend abstract base
>> class, or reflectively found so that we would be able to wrap it to provide
>> synchronization guarantees. I had toyed with the idea of using something
>> like the ProxyInvocationHandler that backs PipelineOptions to be able to
>> provide a modified version of the users instance that had the appropriate
>> synchronization guarantees but couldn't get it to work.
>> >>> >> >
>> >>> >> >>
>> >>> >> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lc...@google.com>
>> wrote:
>> >>> >> >> >
>> >>> >> >> > I have started to work on how to change the user facing API
>> within the Java SDK to support splitting/checkpointing[1], backlog
>> reporting[2] and bundle finalization[3].
>> >>> >> >> >
>> >>> >> >> > I have this PR[4] which contains minimal interface/type
>> definitions to convey how the API surface would change with these 4 changes:
>> >>> >> >> > 1) Exposes the ability for @SplitRestriction to take a
>> backlog suggestion on how to perform splitting and for how many
>> restrictions should be returned.
>> >>> >> >> > 2) Adds the ability for RestrictionTrackers to report backlog
>> >>> >> >> > 3) Updates @ProcessElement to be required to take a generic
>> RestrictionTracker instead of the users own restriction tracker type.
>> >>> >> >> > 4) Adds the ability for
>> @StartBundle/@ProcessElement/@FinishBundle to register a callback that is
>> invoked after bundle finalization.
>> >>> >> >> >
>> >>> >> >> > The details are in the javadoc comments as to how I would
>> expect the contract to play out.
>> >>> >> >> > Feel free to comment on the ML/PR around the contract and
>> after the feedback is received/digested/implemented, I would like to get
>> the changes submitted so that work can start  towards providing an
>> implementation in the Java SDK, Python SDK, and Go SDK and the shared
>> runner portability library.
>> >>> >> >> >
>> >>> >> >> > I would like to call out special attention to 3 since with
>> this change it will enable us to remove the synchronization requirement for
>> users as we will wrap the underlying restriction tracker allowing us to add
>> appropriate synchronization as needed and also to watch any calls that pass
>> through the object such as the claim calls. I also believe this prevents
>> people from writing RestrictionTrackers where the contract of tryClaim is
>> subverted since markDone is outside the purview of tryClaim as in
>> ByteKeyRangeTracker[5].
>> >>> >> >> >
>> >>> >> >> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles
>> >>> >> >> > 2: https://s.apache.org/beam-bundles-backlog-splitting
>> >>> >> >> > 3: https://s.apache.org/beam-finalizing-bundles
>> >>> >> >> > 4: https://github.com/apache/beam/pull/6969
>> >>> >> >> > 5: https://github.com/apache/beam/pull/6949
>>
>

Reply via email to