Ah, I appear to have missed this line, which indicates that the current
RestrictionTracker must be updated.

This invocation updates the {@link
   * #currentRestriction()} to be the primary restriction effectively
having the current {@link
   * DoFn.ProcessElement} execution responsible for performing the work
that the primary restriction
   * represents.

On Sat, Nov 28, 2020 at 2:45 AM Daniel Collins <[email protected]> wrote:

> This does not appear to work: The CheckDone call, as far as I can tell, is
> made on the existing range not the split range based on the following error:
>
> Error message from worker: java.lang.IllegalStateException: Last attempted
> offset was 4601978 in range [2975759, 9223372036854775807), claiming work
> in [4601979, 9223372036854775807) was not attempted
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>
> On Sat, Nov 28, 2020 at 2:18 AM Daniel Collins <[email protected]>
> wrote:
>
>> Can you confirm that the following implementation of trySplit will work
>> as intended (from an OffsetRangeTracker subclass)?
>>
>> @Override
>>   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
>>     return SplitResult.of(
>>         new OffsetRange(currentRestriction().getFrom(), lastClaimedOffset
>> + 1),
>>         new OffsetRange(lastClaimedOffset + 1, Long.MAX_VALUE));
>>   }
>>
>> > It would nice to update the documentation if that's confusing.
>>
>> If you could please update this (if this is indeed the case) to confirm
>> that it ensures that there will never be two unbounded restrictions sent to
>> DoFns running at the same time using this pattern, that would be great.
>>
>> In addition, I'm not quite sure how this works? When the 'trySplit' call
>> occurs, it returns two OffsetRanges, which don't yet include information
>> about claimed offsets. How is the first half of this converted to my
>> OffsetRangeTracker subclass with offsets already claimed? Does the runtime
>> call (and importantly, is it required to call):
>>
>> MyRestrictionTracker tracker =
>> MyDoFn.newTracker(splitResult.getPrimary());
>> tracker.tryClaim(previousClaimed);
>> tracker.checkDone();
>>
>> On Sat, Nov 28, 2020 at 2:08 AM Boyuan Zhang <[email protected]> wrote:
>>
>>> It would nice to update the documentation if that's confusing.
>>>
>>> On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins <[email protected]>
>>> wrote:
>>>
>>>> I think the documentation for trySplit() doesn't make it clear that it
>>>> supports this use case. In particular this section:
>>>>
>>>> > This invocation updates the {@link #currentRestriction()} to be the
>>>> primary restriction effectively having the current {@link
>>>> DoFn.ProcessElement} execution responsible for performing the work that the
>>>> primary restriction represents. The residual restriction will be executed
>>>> in a separate {@link DoFn.ProcessElement} invocation (likely in a different
>>>> process). The work performed by executing the primary and residual
>>>> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be
>>>> equivalent to the work performed as if this split never occurred.
>>>>
>>>> Implies that the runner will try to run both restrictions again on
>>>> separate workers. This is not the behavior I am looking for, hence my
>>>> confusion. Can we change the documentation here to make clear that
>>>> checkDone will be called on the primary restriction in the output to ensure
>>>> that it is actually completed if the trySplit call was triggered by a call
>>>> to resume()?
>>>>
>>>> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> And if you look into the RestrictionTracker javadoc[1], it mentions
>>>>> that what means when you return null from trySplit.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107
>>>>>
>>>>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> To the extent I can see, this never mentions the restriction that you
>>>>>>> <need> to implement a split() that returns a bounded restriction if
>>>>>>> returning resume() from an SDF. Nor does this restriction particularly 
>>>>>>> make
>>>>>>> sense if the range being processed is itself unbounded? Perhaps you 
>>>>>>> would
>>>>>>> consider not calling checkDone() on resume() if the restriction 
>>>>>>> provided to
>>>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>>>> unbounded restriction?
>>>>>>
>>>>>>
>>>>>> It seems like you are not familiar with how beam deals with resume()
>>>>>> so let's start from this part. Let's say that your SDF is processing a
>>>>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you
>>>>>> want to return resume() at this point. When you return resume() from 
>>>>>> here,
>>>>>> the beam Java DoFn invoker will know you want to resume and call your
>>>>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
>>>>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
>>>>>> your current restriction and [6, MAX) as the residual. Then beam Java 
>>>>>> DoFn
>>>>>> invoker will call checkDone on the restriction [0, 6) to double check 
>>>>>> your
>>>>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
>>>>>> will return the [6, Max) restriction back to runner(in your case that's
>>>>>> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
>>>>>> strategy. That's why if you want to use resume(), you need to implement
>>>>>> trySplit. That's also why trySplit and checkDone also make sense on
>>>>>> Unbounded restriction.
>>>>>>
>>>>>> Perhaps it would be better to explain in terms of why I'm trying to
>>>>>>> do this. If the subscription has not received any data in a while, or is
>>>>>>> receiving data infrequently, I want to enable dataflow to scale down to 
>>>>>>> 1
>>>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm 
>>>>>>> not
>>>>>>> actually done with the data because, as new data can always be 
>>>>>>> published in
>>>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>>>> unbounded output is much more natural.
>>>>>>
>>>>>>
>>>>>> So you are referring to the resume use case. Please note that even
>>>>>> though you are returning resume() from your SDF, that doesn't means
>>>>>> Dataflow will guarantee that the worker will be downscaled to 1. But
>>>>>> resume() indeed can help you free some workers to process other work,
>>>>>> compared to having your SDF doing busy wait.
>>>>>>
>>>>>> This is good to know. So to rephrase, could I periodically call
>>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control 
>>>>>>> back
>>>>>>> to the runtime?
>>>>>>
>>>>>>
>>>>>> You can do so by implementing RestrictionTracker.trySplit() and using
>>>>>> resume().
>>>>>>
>>>>>> You may also want to take a look at Kafka example[1]. Hope that is
>>>>>> helpful.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> > There is an update-to-date sdf programming guide[1] and typically
>>>>>>> [2] is talking about SDF initiated-checkpointing
>>>>>>>
>>>>>>> To the extent I can see, this never mentions the restriction that
>>>>>>> you <need> to implement a split() that returns a bounded restriction if
>>>>>>> returning resume() from an SDF. Nor does this restriction particularly 
>>>>>>> make
>>>>>>> sense if the range being processed is itself unbounded? Perhaps you 
>>>>>>> would
>>>>>>> consider not calling checkDone() on resume() if the restriction 
>>>>>>> provided to
>>>>>>> the runner is unbounded since it would be unreasonable to complete an
>>>>>>> unbounded restriction?
>>>>>>>
>>>>>>> > It depends on the definition of no new data to enable rescheduling.
>>>>>>>
>>>>>>> Perhaps it would be better to explain in terms of why I'm trying to
>>>>>>> do this. If the subscription has not received any data in a while, or is
>>>>>>> receiving data infrequently, I want to enable dataflow to scale down to 
>>>>>>> 1
>>>>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>>>>> infrequently, give dataflow the opportunity to scale my job down. I'm 
>>>>>>> not
>>>>>>> actually done with the data because, as new data can always be 
>>>>>>> published in
>>>>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>>>>> implement bounded reads to artificially produce sub-windows when an
>>>>>>> unbounded output is much more natural.
>>>>>>>
>>>>>>> > Please note that the when an SDF is processing one element
>>>>>>> restriction pair, the start of the restriction is never changed. You 
>>>>>>> will
>>>>>>> always get the same offset when you call currentRestriction().getFrom().
>>>>>>>
>>>>>>> This is good to know. So to rephrase, could I periodically call
>>>>>>> tryClaim(<last sent message or getFrom if none yet>) to yield control 
>>>>>>> back
>>>>>>> to the runtime?
>>>>>>>
>>>>>>> -Dan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > Okay, is this documented anywhere? In particular,
>>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>>>
>>>>>>>> There is an update-to-date sdf programming guide[1] and typically
>>>>>>>> [2] is talking about SDF initiated-checkpointing. And stop() should be
>>>>>>>> returned when tryClaim returns false where resume() is expected to 
>>>>>>>> return
>>>>>>>> when the restriction is not fully processed and you want to defer
>>>>>>>> processing in the future.
>>>>>>>>
>>>>>>>> > If this is the case, is there any way I can yield control to the
>>>>>>>> runtime if I have no new data to enable rescheduling? For example, can 
>>>>>>>> I
>>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>>>
>>>>>>>> It depends on the definition of no new data to enable rescheduling.
>>>>>>>> If you believe that you are done with current restriction even
>>>>>>>> though you are not reaching to the end of restriction, you can 
>>>>>>>> specially
>>>>>>>> say I'm done with current one by calling
>>>>>>>> restrictionTracker.tryClaim(MAX_LONG)(or tryClaim(restriction.getTo) 
>>>>>>>> if you
>>>>>>>> are sure your end of restriction is not changed by any splitting).
>>>>>>>> If you just want to re-process the rest of the restriction after a
>>>>>>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the
>>>>>>>> trySplit and return resume(duration) when you want to resume.
>>>>>>>>
>>>>>>>> Please note that the when an SDF is processing one element
>>>>>>>> restriction pair, the start of the restriction is never changed. You 
>>>>>>>> will
>>>>>>>> always get the same offset when you call 
>>>>>>>> currentRestriction().getFrom().
>>>>>>>>
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>>>>>> [2]
>>>>>>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint
>>>>>>>>
>>>>>>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> > you can either never return resume() from your SDF or implement
>>>>>>>>> suitable trySplit() logic for your RestrictionTracker
>>>>>>>>>
>>>>>>>>> Okay, is this documented anywhere? In particular,
>>>>>>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>>>>>>> implies resume() should be returned when tryClaim returns false.
>>>>>>>>>
>>>>>>>>> If this is the case, is there any way I can yield control to the
>>>>>>>>> runtime if I have no new data to enable rescheduling? For example, 
>>>>>>>>> can I
>>>>>>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>>>>>>
>>>>>>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> > IIUC, this should never happen as long as I return null to
>>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>>>
>>>>>>>>>> I noticed that in your implementation you return null for your
>>>>>>>>>> RestrictionTracker.trySplit. That means you cannot return resume() 
>>>>>>>>>> from
>>>>>>>>>> your SplittableDoFn.process() body since resume() means
>>>>>>>>>> performing SplittableDoFn self-initiated checkpointing and
>>>>>>>>>> deferring processing residuals.
>>>>>>>>>>
>>>>>>>>>> In your case, you can either never return resume() from your SDF
>>>>>>>>>> or implement suitable trySplit() logic for your RestrictionTracker. 
>>>>>>>>>> For
>>>>>>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
>>>>>>>>>> infinite restriction.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>>>>>>>>>>
>>>>>>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> > Please note that your current restriction might be changed to
>>>>>>>>>>> a finite restriction during processing one bundle if you do 
>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>> self-initiated checkpointing or any runner issued splits
>>>>>>>>>>>
>>>>>>>>>>> IIUC, this should never happen as long as I return null to
>>>>>>>>>>> trySplit. Is this not the case? (trySplit implementation below)
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>>   public SplitResult<OffsetRange> trySplit(double
>>>>>>>>>>> fractionOfRemainder) {  return null; }
>>>>>>>>>>>
>>>>>>>>>>> > what will you do if we reach the finally block?
>>>>>>>>>>>
>>>>>>>>>>> At that point an exception is being thrown out of the
>>>>>>>>>>> processElement function. The answer to that would be "what will the 
>>>>>>>>>>> runtime
>>>>>>>>>>> do if an exception is thrown out of the processElement function"
>>>>>>>>>>>
>>>>>>>>>>> > Open a WIP PR
>>>>>>>>>>>
>>>>>>>>>>> I have, but I'm staging changes in a separate repo. See
>>>>>>>>>>> https://github.com/googleapis/java-pubsublite/pull/390
>>>>>>>>>>> (although this incorporates other changes, see
>>>>>>>>>>> PubsubLitePartitionSdf.java
>>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>>>>>>>>>>  and PubsubLiteOffsetRangeTracker.java
>>>>>>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>>>>>>>>>>  for
>>>>>>>>>>> the sdf and restriction tracker implementations)
>>>>>>>>>>>
>>>>>>>>>>> -Dan
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Boyuan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Responses inline.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee
>>>>>>>>>>>>> that when you exit you SplittableDoFn.process you must have 
>>>>>>>>>>>>> completed all
>>>>>>>>>>>>> the work in the current restriction
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is impossible with unbounded restrictions since, by
>>>>>>>>>>>>> definition, all work cannot be completed.
>>>>>>>>>>>>>
>>>>>>>>>>>> Please note that your current restriction might be changed to a
>>>>>>>>>>>> finite restriction during processing one bundle if you do 
>>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>>> self-initiated checkpointing or any runner issued splits.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> > In your case, it seems like after you do tryClaim(3188439),
>>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process 
>>>>>>>>>>>>> function
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is not true. The code in question is below. stop() is
>>>>>>>>>>>>> only returned if tryClaim returns false.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Dan
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom()))
>>>>>>>>>>>>> {
>>>>>>>>>>>>>
>>>>>>>>>>>>       logger.atWarning().log("Failed to claim initial
>>>>>>>>>>>>> restriction for partition " + partition);
>>>>>>>>>>>>>       return ProcessContinuation.stop();
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>     sleepTimeRemaining = maxSleepTime;
>>>>>>>>>>>>>     Committer committer = committerSupplier.apply(partition);
>>>>>>>>>>>>>     committer.startAsync().awaitRunning();
>>>>>>>>>>>>>     try (PullSubscriber<SequencedMessage> subscriber =
>>>>>>>>>>>>>         subscriberFactory.apply(partition,
>>>>>>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) {
>>>>>>>>>>>>>       while (true) {
>>>>>>>>>>>>>         List<SequencedMessage> messages = doPoll(subscriber);
>>>>>>>>>>>>>         // We polled for as long as possible, yield to the
>>>>>>>>>>>>> runtime to allow it to reschedule us on
>>>>>>>>>>>>>         // a new task.
>>>>>>>>>>>>>         if (messages.isEmpty()) {
>>>>>>>>>>>>>           logger.atWarning().log("Yielding due to timeout on
>>>>>>>>>>>>> partition " + partition);
>>>>>>>>>>>>>           return ProcessContinuation.resume();
>>>>>>>>>>>>>         }
>>>>>>>>>>>>>         long lastOffset =
>>>>>>>>>>>>> Iterables.getLast(messages).offset().value();
>>>>>>>>>>>>>         if (tracker.tryClaim(lastOffset)) {
>>>>>>>>>>>>>           messages.forEach(
>>>>>>>>>>>>>               message ->
>>>>>>>>>>>>>                   receiver.outputWithTimestamp(
>>>>>>>>>>>>>                       message, new
>>>>>>>>>>>>> Instant(Timestamps.toMillis(message.publishTime()))));
>>>>>>>>>>>>>           committer.commitOffset(Offset.of(lastOffset +
>>>>>>>>>>>>> 1)).get();
>>>>>>>>>>>>>         } else {
>>>>>>>>>>>>>           logger.atWarning().log("Stopping partition " +
>>>>>>>>>>>>> partition);
>>>>>>>>>>>>>           return ProcessContinuation.stop();
>>>>>>>>>>>>>         }
>>>>>>>>>>>>>       }
>>>>>>>>>>>>>     } finally {
>>>>>>>>>>>>>       committer.stopAsync().awaitTerminated();
>>>>>>>>>>>>>     }
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>> From your code, what will you do if we reach the finally block?
>>>>>>>>>>>> Would you like to open a WIP PR to show more details?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Daniel,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>>>>>>>> when you exit you SplittableDoFn.process(either you return 
>>>>>>>>>>>>>> stop() or
>>>>>>>>>>>>>> resume()), you must have completed all the work in the current 
>>>>>>>>>>>>>> restriction.
>>>>>>>>>>>>>> This is one of major ways for SplittableDoFn to prevent data 
>>>>>>>>>>>>>> loss.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In your case, it seems like after you do tryClaim(3188439),
>>>>>>>>>>>>>> you return stop() directly from your SplittableDoFn.process 
>>>>>>>>>>>>>> function.
>>>>>>>>>>>>>> That's not a correct way when working with restriction and 
>>>>>>>>>>>>>> restriction
>>>>>>>>>>>>>> tracker. You should either return resume() to perform 
>>>>>>>>>>>>>> SplittableDoFn
>>>>>>>>>>>>>> initiated checkpoint to defer processing restriction [3188439,
>>>>>>>>>>>>>> 9223372036854775807), or you should return stop() only when you 
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>> tryClaim() return False.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running
>>>>>>>>>>>>>>> into the following error on dataflow with a RestrictionTracker 
>>>>>>>>>>>>>>> returning
>>>>>>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to 
>>>>>>>>>>>>>>> `checkDone`,
>>>>>>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think 
>>>>>>>>>>>>>>> there's any
>>>>>>>>>>>>>>> rational thing I can do in this case. Does anyone know what 
>>>>>>>>>>>>>>> should be done
>>>>>>>>>>>>>>> for this method?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The following exist in the RestrictionTracker javadoc:
>>>>>>>>>>>>>>> -Must throw an exception with an informative error message,
>>>>>>>>>>>>>>> if there is still any unclaimed work remaining in the 
>>>>>>>>>>>>>>> restriction. (there
>>>>>>>>>>>>>>> is, the restriction is unbounded)
>>>>>>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "Error message from worker: java.lang.IllegalStateException:
>>>>>>>>>>>>>>> Last attempted offset was 3188438 in range [1998348, 
>>>>>>>>>>>>>>> 9223372036854775807),
>>>>>>>>>>>>>>> claiming work in [3188439, 9223372036854775807) was not 
>>>>>>>>>>>>>>> attempted
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to