I was able to update the failing Watch transform in
https://github.com/apache/beam/pull/8146 and this has now been merged.

On Mon, Mar 18, 2019 at 10:32 AM Lukasz Cwik <lc...@google.com> wrote:

> Thanks Kenn, based upon the error message there was a small amount of code
> that I missed when updating the code. I'll attempt to fix this in the next
> few days.
>
> On Mon, Jan 14, 2019 at 7:26 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> I wanted to use this thread to ping that the change to the user-facing
>> API in order to wrap RestrictionTracker broke the Watch transform, which
>> has been sickbayed for a long time. It would be helpful for experts to
>> weigh in on https://issues.apache.org/jira/browse/BEAM-6352 about how
>> the functionality used here should be implemented.
>>
>> Kenn
>>
>> On Wed, Dec 5, 2018 at 4:45 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Based upon the current Java SDK API, I was able to implement Runner
>>> initiated checkpointing that the Java SDK honors within PR
>>> https://github.com/apache/beam/pull/7200.
>>>
>>> This is an exciting first step to a splitting implementation, feel free
>>> to take a look and comment. I have added two basic tests, execute SDF
>>> without splitting and execute SDF with a runner initiated checkpoint.
>>>
>>> On Fri, Nov 30, 2018 at 4:52 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Fri, Nov 30, 2018 at 10:14 PM Lukasz Cwik <lc...@google.com> wrote:
>>>> >
>>>> > On Fri, Nov 30, 2018 at 1:02 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>> >>
>>>> >> On Fri, Nov 30, 2018 at 6:38 PM Lukasz Cwik <lc...@google.com>
>>>> wrote:
>>>> >> >
>>>> >> > Sorry, for some reason I thought I had answered these.
>>>> >>
>>>> >> No problem, thanks for you patience :).
>>>> >>
>>>> >> > On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw <
>>>> rober...@google.com> wrote:
>>>> >> >>
>>>> >> >> I still have outstanding questions (above) about
>>>> >> >>
>>>> >> >> 1) Why we need arbitrary precision for backlog, instead of just
>>>> using
>>>> >> >> a (much simpler) double.
>>>> >> >
>>>> >> >
>>>> >> > Double lacks the precision for reporting backlogs for byte key
>>>> ranges (HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and
>>>> with a large number of keys with a really long common prefix such as
>>>> "aaaaaaaaaaaaaaaaaaaaaaaaaab" and "aaaaaaaaaaaaaaaaaaaaaaaaaac", ... leads
>>>> to the backlog not changing even though we are making progress through the
>>>> key space. This also prevents splitting within such an area since the
>>>> double can't provide that necessary precision (without multiple rounds of
>>>> splitting which adds complexity).
>>>> >>
>>>> >> We'll have to support multiple rounds of splitting regardless. I can
>>>> >> see how this gives more information up front though.
>>>> >
>>>> > I agree that we will need to support multiple rounds of splitting
>>>> from the SDK side but this adds complexity from the runner side since it
>>>> can only increase the accuracy for a split by performing multiple rounds of
>>>> splitting at once.
>>>> >
>>>> >> (As an aside, I've been thinking about some ways of solving the dark
>>>> >> matter problem, and it might depend on knowing the actual key, using
>>>> >> the fact that character boundaries are likely cut-off points for
>>>> >> changes in density, which would get obscured by alternative
>>>> >> representations.)
>>>> >
>>>> > Every time I think about this issue, I can never get it to apply
>>>> meaningfully for unbounded sources such as a message queue like pubsub.
>>>>
>>>> Yeah, neither can I.
>>>>
>>>> > Also, having an infinitely precise backlog such as the decimal format
>>>> would still provide density information as the rate of change through the
>>>> backlog for a bounded source would change once a "cluster" was hit.
>>>>
>>>> This is getting to somewhat of a tangential topic, but the key insight
>>>> is that although it's easy to find the start of a cluster, to split
>>>> ideally one would want to know where the end of the cluster is. For
>>>> keyspaces, this is likely to be at binary fractions, and in particular
>>>> looking at the longevity of common prefixes of length n one could make
>>>> heuristic guesses as to where this density dropoff may be. (This also
>>>> requires splitting at a key, not splitting relative to a current
>>>> position, which has its issues...)
>>>>
>>>> >> >> 2) Whether its's worth passing backlog back to split requests,
>>>> rather
>>>> >> >> than (again) a double representing "portion of current remaining"
>>>> >> >> which may change over time. (The most common split request is into
>>>> >> >> even portions, and specifically half, which can't accurately be
>>>> >> >> requested from a stale backlog.)
>>>> >> >
>>>> >> > I see two scenarios here:
>>>> >> > * the fraction is exposed to the SDF author and then the SDF
>>>> author needs to map from their restriciton space to backlog and also map
>>>> fractions onto their restriction space meaning that they are required to
>>>> write mappings between three different models.
>>>> >> > * the fraction is not exposed to the SDF author and the framework
>>>> code multiplies the fraction against the backlog and provides the backlog
>>>> to the user (this solves the backlog skew issue but still has the limited
>>>> precision issue).
>>>> >>
>>>> >> Limited precision is not as much of an issue here because one can
>>>> >> express very small numbers to split close to the current position,
>>>> and
>>>> >> don't need high precision for splitting further away.
>>>> >
>>>> > Agree. Would this also mean that skew when splitting at half doesn't
>>>> really matter?
>>>>
>>>> Lots of times keyspaces have big pockets of low density. If one hits
>>>> one of these ranges between when the backlog is reported and when the
>>>> split is requested, the skew can get quite large. Basically using a
>>>> fraction means that a system does not have to be as concerned about
>>>> stale data, and can make reasonable choices without data at all (e.g.
>>>> imagine upscaling from 200 to 300 workers and asking for everyone to
>>>> just give 33% of their work back), and when it does make choices based
>>>> on actual backlog the discrepancy between what was ideal at the time
>>>> backlog was requested and what's ideal now is shared between the
>>>> primary and remainder(s) rather than one side or the other absorbing
>>>> this entire error
>>>>
>>>> This of course gets exacerbated with multiple splits, e.g. if the
>>>> measured backlog was 100 and you wanted to split the work in 10
>>>> pieces, asking for a split at 10 would only result in 9 splits if the
>>>> cursor advanced by 10 in the meantime, and if it advanced by 9 you'd
>>>> probably want to use fractions anyway to spread the error out and
>>>> produce (10, 9, 9, 9, 9, 9, 9, 9, 9, 9) rather than (10, 10, 10, 10,
>>>> 10, 10, 10, 10, 10, 1).
>>>>
>>>> >> I also think it's nice that the space of possible splits is always
>>>> >> (current position, restriction end) which a always double maps onto
>>>> >> despite those both being moving targets. If you phrase things in
>>>> terms
>>>> >> of backlogs, you might ask for impossible things. I don't recall if
>>>> >> the passed backlog is the amount that should be retained or the
>>>> amount
>>>> >> that should be returned, but if the latter, it'll be difficult to
>>>> >> accurately split near the current position.
>>>> >
>>>> >
>>>> > For the current proposal, it represents how much should be retained
>>>> but as was mentioned earlier, the semantics of returning multiple splits is
>>>> still up in the air.
>>>> >
>>>> >>
>>>> >> > I believe it is easier for an SDF author to write a two way
>>>> mapping from backlog to their position space then to write two different
>>>> types of mappings. For example, when a person is reading a file that has
>>>> 100 bytes to process and is asked to split at 60.3%, they have to map 60.3%
>>>> onto 100 bytes figuring out that they are responsible for 60.3 bytes in
>>>> which they round down to 60 bytes. In the scenario where the runner
>>>> provides the backlog, 60.3 would have been sent across and the SDF author
>>>> would only need to perform rounding.
>>>> >>
>>>> >> Yeah, that's something to mull on. Maybe with a set of concrete
>>>> examples.
>>>> >>
>>>> >> >> There are also some questions about returning multiple
>>>> remainders, and
>>>> >> >> how that relates to/overlaps with the initial splitting, but
>>>> those can
>>>> >> >> probably be deferred.
>>>> >> >
>>>> >> >
>>>> >> > Agree.
>>>> >> >
>>>> >> >>
>>>> >> >> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik <lc...@google.com>
>>>> wrote:
>>>> >> >> >
>>>> >> >> > I updated the PR addressing the last of Scott's comments and
>>>> also migrated to use an integral fraction as Robert had recommended by
>>>> using approach A for the proto representation and BigDecimal within the
>>>> Java SDK:
>>>> >> >> > A:
>>>> >> >> > // Represents a non-negative decimal number: unscaled_value *
>>>> 10^(-scale)
>>>> >> >> > message Decimal {
>>>> >> >> >   // Represents the unscaled value as a big endian unlimited
>>>> precision non-negative integer.
>>>> >> >> >   bytes unscaled_value = 1;
>>>> >> >> >   // Represents the scale
>>>> >> >> >   uint32 scale = 2;
>>>> >> >> > }
>>>> >> >> >
>>>> >> >> > Ismael, I would like to defer the changes to improve the
>>>> ByteBuddy DoFnInvoker since that is parallelizable work and have filed
>>>> BEAM-6142.
>>>> >> >> >
>>>> >> >> > I don't believe there are any other outstanding changes and
>>>> would like to get the PR merged so that people can start working on
>>>> implementing support for backlog reporting and splitting within the Java
>>>> SDK harness, improving the ByteBuddy DoFnInvoker, exposing the shared
>>>> runner library parts, and integrating this into ULR, Flink, Dataflow, ...
>>>> >> >> >
>>>> >> >> > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik <lc...@google.com>
>>>> wrote:
>>>> >> >> >>
>>>> >> >> >>
>>>> >> >> >>
>>>> >> >> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía <
>>>> ieme...@gmail.com> wrote:
>>>> >> >> >>>
>>>> >> >> >>> > Bundle finalization is unrelated to backlogs but is needed
>>>> since there is a class of data stores which need acknowledgement that says
>>>> I have successfully received your data and am now responsible for it such
>>>> as acking a message from a message queue.
>>>> >> >> >>>
>>>> >> >> >>> Currently ack is done by IOs as part of checkpointing. How
>>>> this will
>>>> >> >> >>> be different? Can you please clarify how should be done in
>>>> this case,
>>>> >> >> >>> or is this totally independent?
>>>> >> >> >>
>>>> >> >> >>
>>>> >> >> >> The flow for finalization and checkpointing is similar:
>>>> >> >> >> Checkpointing:
>>>> >> >> >> 1) Process a bundle
>>>> >> >> >> 2) Checkpoint bundle containing acks that need to be done
>>>> >> >> >> 3) When checkpoint resumes, acknowledge messages
>>>> >> >> >>
>>>> >> >> >> Finalization:
>>>> >> >> >> 1) Process a bundle
>>>> >> >> >> 2) Request bundle finalization when bundle completes
>>>> >> >> >> 3) SDK is asked to finalize bundle
>>>> >> >> >>
>>>> >> >> >> The difference between the two is that bundle finalization
>>>> always goes back to the same machine instance that processed the bundle
>>>> while checkpointing can be scheduled on another machine. Many message queue
>>>> like systems expose clients which store in memory state and can't ack from
>>>> another machine. You could solve the problem with checkpointing but would
>>>> require each machine to be able to tell another machine that it got a
>>>> checkpoint with acks that it is responsible for but this won't work
>>>> everywhere and isn't as clean.
>>>> >> >> >>
>>>> >> >> >>>
>>>> >> >> >>> > UnboundedPerElement/BoundedPerElement tells us during
>>>> pipeline construction time what type of PCollection we will be creating
>>>> since we may have a bounded PCollection goto an UnboundedPerElement DoFn
>>>> and that will produce an unbounded PCollection and similarly we could have
>>>> an unbounded PCollection goto a BoundedPerElement DoFn and that will
>>>> produce an unbounded PCollection. Restrictions.IsBounded is used during
>>>> pipeline execution to inform the runner whether a restriction being
>>>> returned is bounded or not since unbounded restrictions can return bounded
>>>> restrictions during splitting. So in the above example using the message
>>>> queue, the first 7 restrictions that only read 1250 messages would be
>>>> marked with the Restrictions.IsBounded interface while the last one would
>>>> not be. This could also be a method on restrictions such as "IsBounded
>>>> isBounded()" on Pcollections.
>>>> >> >> >>>
>>>> >> >> >>> Thanks for the explanation about Restrictions.IsBounded,
>>>> since this is
>>>> >> >> >>> information for the runner What is the runner expected to do
>>>> >> >> >>> differently when IsUnbounded? (I assume that IsBounded is the
>>>> default
>>>> >> >> >>> behavior and nothing changes).
>>>> >> >> >>
>>>> >> >> >>
>>>> >> >> >> Knowing whether a restriction is bounded or unbounded is
>>>> important, one example use case would be for the limited depth splitting
>>>> proposal (
>>>> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv)
>>>> since you want to keep the unbounded restrictions at level 0 and only pass
>>>> the bounded restrictions to the other levels. The reasoning behind this is
>>>> that you don't want to end up in a state where all your unbounded
>>>> restrictions are at the highest level preventing you from splitting any
>>>> further.
>>>> >> >> >>
>>>> >> >> >>>
>>>> >> >> >>> > Note that this does bring up the question of whether SDKs
>>>> should expose coders for backlogs since ByteKeyCoder and BigEndianLongCoder
>>>> exist which would cover a good number of scenarios described above. This
>>>> coder doesn't have to be understood by the runner nor does it have to be
>>>> part of the portability APIs (either Runner of Fn API). WDYT?
>>>> >> >> >>>
>>>> >> >> >>> Yes we may need a Coder effectively for both sides, only
>>>> thing I don’t
>>>> >> >> >>> like is external impact in the API. I mean it is not too
>>>> complex, but
>>>> >> >> >>> adds some extras to support things that are ‘rarely’ changed.
>>>> >> >> >>
>>>> >> >> >>
>>>> >> >> >> Based upon Robert's suggestion above to swap to use a integral
>>>> floating point number and even without Robert's suggestion this won't work.
>>>> The idea was that a coder would help convert the byte[] backlog
>>>> representation to/from a type the user wants but the issue is that the
>>>> Runner may give any arbitrary byte[] backlog to the SDK during splitting
>>>> and this coder would need to be able to handle it.
>>>> >> >> >>
>>>> >> >> >>>
>>>> >> >> >>> > Ismael, I looked at the API around ByteKeyRangeTracker and
>>>> OffsetRangeTracker figured out that the classes are named as such because
>>>> they are trackers for the OffsetRange and ByteKeyRange classes. Some
>>>> options are to:
>>>> >> >> >>> > 1) Copy the ByteKeyRange and call it ByteKeyRestriction and
>>>> similarly copy OffsetRange and call it OffsetRestriction. This would allow
>>>> us to name the trackers ByteKeyRestrictionTracker and
>>>> OffsetRestrictionTracker. Note that we can't rename because that would be a
>>>> backwards incompatible change for existing users of
>>>> ByteKeyRange/OffsetRange. This would allow us to add methods relevant to
>>>> SDF and remove methods that aren't needed.
>>>> >> >> >>> > 2) Rename ByteKeyRangeTracker to
>>>> ByteKeyRangeRestrictionTracker and OffsetRangeTracker to
>>>> OffsetRangeRestrictionTracker. Not really liking this option.
>>>> >> >> >>> > 3) Leave things as they are.
>>>> >> >> >>>
>>>> >> >> >>> For the RangeTracker vs RestrictionTracker discussion I will
>>>> probably
>>>> >> >> >>> lean to (3) Leave things as they are) save if there is
>>>> important
>>>> >> >> >>> things to change/fix (1) which I am not aware of.
>>>> >> >> >>
>>>> >> >> >>
>>>> >> >> >> Sounds good to me.
>>>> >> >> >>
>>>> >> >> >>>
>>>> >> >> >>> On Tue, Nov 20, 2018 at 12:07 AM Lukasz Cwik <
>>>> lc...@google.com> wrote:
>>>> >> >> >>> >
>>>> >> >> >>> > Sorry for the late reply.
>>>> >> >> >>> >
>>>> >> >> >>> > On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <
>>>> ieme...@gmail.com> wrote:
>>>> >> >> >>> >>
>>>> >> >> >>> >> Some late comments, and my pre excuses if some questions
>>>> look silly,
>>>> >> >> >>> >> but the last documents were a lot of info that I have not
>>>> yet fully
>>>> >> >> >>> >> digested.
>>>> >> >> >>> >>
>>>> >> >> >>> >> I have some questions about the ‘new’ Backlog concept
>>>> following a
>>>> >> >> >>> >> quick look at the PR
>>>> >> >> >>> >> https://github.com/apache/beam/pull/6969/files
>>>> >> >> >>> >>
>>>> >> >> >>> >> 1. Is the Backlog a specific concept for each IO? Or in
>>>> other words:
>>>> >> >> >>> >> ByteKeyRestrictionTracker can be used by HBase and
>>>> Bigtable, but I am
>>>> >> >> >>> >> assuming from what I could understand that the Backlog
>>>> implementation
>>>> >> >> >>> >> will be data store specific, is this the case? or it can
>>>> be in some
>>>> >> >> >>> >> case generalized (for example for Filesystems)?
>>>> >> >> >>> >
>>>> >> >> >>> >
>>>> >> >> >>> > The backlog is tied heavily to the restriction tracker
>>>> implementation, any data store using the same restriction tracker will
>>>> provide the same backlog computation. For example, if HBase/Bigtable use
>>>> the ByteKeyRestrictionTracker then they will use the same backlog
>>>> calculation. Note that an implementation could subclass a restriction
>>>> tracker if the data store could provide additional information. For
>>>> example, the default backlog for a ByteKeyRestrictionTracker over
>>>> [startKey, endKey) is distance(currentKey, lastKey) where distance is
>>>> represented as byte array subtraction (which can be wildly inaccurrate as
>>>> the density of data is not well reflected) but if HBase/Bigtable could
>>>> provide the number of bytes from current key to last key, a better
>>>> representation could be provided.
>>>> >> >> >>> >
>>>> >> >> >>> > Other common examples of backlogs would be:
>>>> >> >> >>> > * files: backlog = length of file - current byte offset
>>>> >> >> >>> > * message queues: backlog = number of outstanding messages
>>>> >> >> >>> >
>>>> >> >> >>> >>
>>>> >> >> >>> >>
>>>> >> >> >>> >> 2. Since the backlog is a byte[] this means that it is up
>>>> to the user
>>>> >> >> >>> >> to give it a meaning depending on the situation, is this
>>>> correct? Also
>>>> >> >> >>> >> since splitRestriction has now the Backlog as an argument,
>>>> what do we
>>>> >> >> >>> >> expect the person that implements this method in a DoFn to
>>>> do ideally
>>>> >> >> >>> >> with it? Maybe a more concrete example of how things fit
>>>> for
>>>> >> >> >>> >> File/Offset or HBase/Bigtable/ByteKey will be helpful
>>>> (maybe also for
>>>> >> >> >>> >> the BundleFinalizer concept too).
>>>> >> >> >>> >
>>>> >> >> >>> >
>>>> >> >> >>> > Yes, the restriction tracker/restriction/SplittableDoFn
>>>> must give the byte[] a meaning. This can have any meaning but we would like
>>>> that the backlog byte[] representation to be lexicograhically comparable
>>>> (when viewing the byte[] in big endian format and prefixes are smaller
>>>> (e.g. 001 is smaller then 0010) and preferably a linear representation.
>>>> Note that all restriction trackers of the same type should use the same
>>>> "space" so that backlogs are comparable across multiple restriction tracker
>>>> instances.
>>>> >> >> >>> >
>>>> >> >> >>> > The backlog when provided to splitRestriction should be
>>>> used to subdivide the restriction into smaller restrictions where each
>>>> would have the backlog if processed (except for potentially the last).
>>>> >> >> >>> >
>>>> >> >> >>> > A concrete example would be to represent the remaining
>>>> bytes to process in a file as a 64 bit big endian integer, lets say that is
>>>> 500MiB (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111
>>>> 01000000 (note that the trailing zeros are optional and doesn't impact the
>>>> calculation). The runner could notice that processing the restriction will
>>>> take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the
>>>> bits over by 4 and asks to split using backlog 00000000 00000000 00000000
>>>> 00000000 00000001 11110100. The SDK is able to convert this request back
>>>> into 32768000 bytes and returns 16 restrictions. Another example would be
>>>> for a message queue where we have 10000 messages on the queue remaining so
>>>> the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000
>>>> 00100111 00010000 when represented as a 64 bit unsigned big endian integer.
>>>> The runner could ask the SDK to split using a 1/8th backlog of 00000000
>>>> 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the
>>>> SDK would break out into 8 restrictions, the first 7 responsible for
>>>> reading 1250 messages and stopping while the last restriction would read
>>>> 1250 messages and then continue to read anything else that has been
>>>> enqueued.
>>>> >> >> >>> >
>>>> >> >> >>> > Bundle finalization is unrelated to backlogs but is needed
>>>> since there is a class of data stores which need acknowledgement that says
>>>> I have successfully received your data and am now responsible for it such
>>>> as acking a message from a message queue.
>>>> >> >> >>> >
>>>> >> >> >>> >>
>>>> >> >> >>> >>
>>>> >> >> >>> >> 3. By default all Restrictions are assumed to be unbounded
>>>> but there
>>>> >> >> >>> >> is this new Restrictions.IsBounded method, can’t this
>>>> behavior be
>>>> >> >> >>> >> inferred (adapted) from the DoFn
>>>> UnboundedPerElement/Bounded
>>>> >> >> >>> >> annotation or are these independent concepts?
>>>> >> >> >>> >
>>>> >> >> >>> >
>>>> >> >> >>> > UnboundedPerElement/BoundedPerElement tells us during
>>>> pipeline construction time what type of PCollection we will be creating
>>>> since we may have a bounded PCollection goto an UnboundedPerElement DoFn
>>>> and that will produce an unbounded PCollection and similarly we could have
>>>> an unbounded PCollection goto a BoundedPerElement DoFn and that will
>>>> produce an unbounded PCollection. Restrictions.IsBounded is used during
>>>> pipeline execution to inform the runner whether a restriction being
>>>> returned is bounded or not since unbounded restrictions can return bounded
>>>> restrictions during splitting. So in the above example using the message
>>>> queue, the first 7 restrictions that only read 1250 messages would be
>>>> marked with the Restrictions.IsBounded interface while the last one would
>>>> not be. This could also be a method on restrictions such as "IsBounded
>>>> isBounded()" on PCollections.
>>>> >> >> >>> >
>>>> >> >> >>> >> Extra unrelated comment:
>>>> >> >> >>> >> Since SDF is still @Experimental we should probably rename
>>>> >> >> >>> >> OffsetRangeTracker and ByteKeyRangeTracker into the
>>>> RestrictionTracker
>>>> >> >> >>> >> suffix (I don’t know why they share the RangeTracker
>>>> suffix for the
>>>> >> >> >>> >> new trackers, WDYT?
>>>> >> >> >>> >
>>>> >> >> >>> >
>>>> >> >> >>> > Agree, will perform in a follow-up PR.
>>>> >> >> >>> >
>>>> >> >> >>> >>
>>>> >> >> >>> >> On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <
>>>> lc...@google.com> wrote:
>>>> >> >> >>> >> >
>>>> >> >> >>> >> >
>>>> >> >> >>> >> >
>>>> >> >> >>> >> > On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <
>>>> rober...@google.com> wrote:
>>>> >> >> >>> >> >>
>>>> >> >> >>> >> >> I think that not returning the users specific subclass
>>>> should be fine.
>>>> >> >> >>> >> >> Does the removal of markDone imply that the consumer
>>>> always knows a
>>>> >> >> >>> >> >> "final" key to claim on any given restriction?
>>>> >> >> >>> >> >
>>>> >> >> >>> >> >
>>>> >> >> >>> >> > Yes, each restriction needs to support claiming a
>>>> "final" key that would make the restriction "done". In the BigTable/HBase
>>>> case it is the empty key "", for files it can be a file offset beyond the
>>>> end of the file. Generally, restriction trackers written by SDF authors
>>>> could also take an instance of an object that they can compare instance
>>>> equality against for a final key. Alternatively we could allow restriction
>>>> trackers to implement markDone() but would need the SDK have knowledge of
>>>> the method by having the RestrictionTracker implement interface, extend
>>>> abstract base class, or reflectively found so that we would be able to wrap
>>>> it to provide synchronization guarantees. I had toyed with the idea of
>>>> using something like the ProxyInvocationHandler that backs PipelineOptions
>>>> to be able to provide a modified version of the users instance that had the
>>>> appropriate synchronization guarantees but couldn't get it to work.
>>>> >> >> >>> >> >
>>>> >> >> >>> >> >>
>>>> >> >> >>> >> >> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <
>>>> lc...@google.com> wrote:
>>>> >> >> >>> >> >> >
>>>> >> >> >>> >> >> > I have started to work on how to change the user
>>>> facing API within the Java SDK to support splitting/checkpointing[1],
>>>> backlog reporting[2] and bundle finalization[3].
>>>> >> >> >>> >> >> >
>>>> >> >> >>> >> >> > I have this PR[4] which contains minimal
>>>> interface/type definitions to convey how the API surface would change with
>>>> these 4 changes:
>>>> >> >> >>> >> >> > 1) Exposes the ability for @SplitRestriction to take
>>>> a backlog suggestion on how to perform splitting and for how many
>>>> restrictions should be returned.
>>>> >> >> >>> >> >> > 2) Adds the ability for RestrictionTrackers to report
>>>> backlog
>>>> >> >> >>> >> >> > 3) Updates @ProcessElement to be required to take a
>>>> generic RestrictionTracker instead of the users own restriction tracker
>>>> type.
>>>> >> >> >>> >> >> > 4) Adds the ability for
>>>> @StartBundle/@ProcessElement/@FinishBundle to register a callback that is
>>>> invoked after bundle finalization.
>>>> >> >> >>> >> >> >
>>>> >> >> >>> >> >> > The details are in the javadoc comments as to how I
>>>> would expect the contract to play out.
>>>> >> >> >>> >> >> > Feel free to comment on the ML/PR around the contract
>>>> and after the feedback is received/digested/implemented, I would like to
>>>> get the changes submitted so that work can start  towards providing an
>>>> implementation in the Java SDK, Python SDK, and Go SDK and the shared
>>>> runner portability library.
>>>> >> >> >>> >> >> >
>>>> >> >> >>> >> >> > I would like to call out special attention to 3 since
>>>> with this change it will enable us to remove the synchronization
>>>> requirement for users as we will wrap the underlying restriction tracker
>>>> allowing us to add appropriate synchronization as needed and also to watch
>>>> any calls that pass through the object such as the claim calls. I also
>>>> believe this prevents people from writing RestrictionTrackers where the
>>>> contract of tryClaim is subverted since markDone is outside the purview of
>>>> tryClaim as in ByteKeyRangeTracker[5].
>>>> >> >> >>> >> >> >
>>>> >> >> >>> >> >> > 1:
>>>> https://s.apache.org/beam-checkpoint-and-split-bundles
>>>> >> >> >>> >> >> > 2:
>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>> >> >> >>> >> >> > 3: https://s.apache.org/beam-finalizing-bundles
>>>> >> >> >>> >> >> > 4: https://github.com/apache/beam/pull/6969
>>>> >> >> >>> >> >> > 5: https://github.com/apache/beam/pull/6949
>>>>
>>>

Reply via email to