And if you look into the RestrictionTracker javadoc[1], it mentions that
what means when you return null from trySplit.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107

On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang <[email protected]> wrote:

> To the extent I can see, this never mentions the restriction that you
>> <need> to implement a split() that returns a bounded restriction if
>> returning resume() from an SDF. Nor does this restriction particularly make
>> sense if the range being processed is itself unbounded? Perhaps you would
>> consider not calling checkDone() on resume() if the restriction provided to
>> the runner is unbounded since it would be unreasonable to complete an
>> unbounded restriction?
>
>
> It seems like you are not familiar with how beam deals with resume() so
> let's start from this part. Let's say that your SDF is processing a
> restriction of [0, MAX) and so far you have done tryClaim(5), and you
> want to return resume() at this point. When you return resume() from here,
> the beam Java DoFn invoker will know you want to resume and call your
> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
> your current restriction and [6, MAX) as the residual. Then beam Java DoFn
> invoker will call checkDone on the restriction [0, 6) to double check your
> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
> will return the [6, Max) restriction back to runner(in your case that's
> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
> strategy. That's why if you want to use resume(), you need to implement
> trySplit. That's also why trySplit and checkDone also make sense on
> Unbounded restriction.
>
> Perhaps it would be better to explain in terms of why I'm trying to do
>> this. If the subscription has not received any data in a while, or is
>> receiving data infrequently, I want to enable dataflow to scale down to 1
>> worker, but there will be no need to call "tryClaim" if there is no new
>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>> actually done with the data because, as new data can always be published in
>> the future, we can't know that, and I'm trying to avoid needing to
>> implement bounded reads to artificially produce sub-windows when an
>> unbounded output is much more natural.
>
>
> So you are referring to the resume use case. Please note that even though
> you are returning resume() from your SDF, that doesn't means Dataflow will
> guarantee that the worker will be downscaled to 1. But resume() indeed can
> help you free some workers to process other work, compared to having your
> SDF doing busy wait.
>
> This is good to know. So to rephrase, could I periodically call
>> tryClaim(<last sent message or getFrom if none yet>) to yield control back
>> to the runtime?
>
>
> You can do so by implementing RestrictionTracker.trySplit() and using
> resume().
>
> You may also want to take a look at Kafka example[1]. Hope that is helpful.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
>
>
>
> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins <[email protected]>
> wrote:
>
>> > There is an update-to-date sdf programming guide[1] and typically [2]
>> is talking about SDF initiated-checkpointing
>>
>> To the extent I can see, this never mentions the restriction that you
>> <need> to implement a split() that returns a bounded restriction if
>> returning resume() from an SDF. Nor does this restriction particularly make
>> sense if the range being processed is itself unbounded? Perhaps you would
>> consider not calling checkDone() on resume() if the restriction provided to
>> the runner is unbounded since it would be unreasonable to complete an
>> unbounded restriction?
>>
>> > It depends on the definition of no new data to enable rescheduling.
>>
>> Perhaps it would be better to explain in terms of why I'm trying to do
>> this. If the subscription has not received any data in a while, or is
>> receiving data infrequently, I want to enable dataflow to scale down to 1
>> worker, but there will be no need to call "tryClaim" if there is no new
>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>> actually done with the data because, as new data can always be published in
>> the future, we can't know that, and I'm trying to avoid needing to
>> implement bounded reads to artificially produce sub-windows when an
>> unbounded output is much more natural.
>>
>> > Please note that the when an SDF is processing one element restriction
>> pair, the start of the restriction is never changed. You will always get
>> the same offset when you call currentRestriction().getFrom().
>>
>> This is good to know. So to rephrase, could I periodically call
>> tryClaim(<last sent message or getFrom if none yet>) to yield control back
>> to the runtime?
>>
>> -Dan
>>
>>
>>
>> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]> wrote:
>>
>>> > Okay, is this documented anywhere? In particular,
>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>> implies resume() should be returned when tryClaim returns false.
>>>
>>> There is an update-to-date sdf programming guide[1] and typically [2] is
>>> talking about SDF initiated-checkpointing. And stop() should be returned
>>> when tryClaim returns false where resume() is expected to return when the
>>> restriction is not fully processed and you want to defer processing in the
>>> future.
>>>
>>> > If this is the case, is there any way I can yield control to the
>>> runtime if I have no new data to enable rescheduling? For example, can I
>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>
>>> It depends on the definition of no new data to enable rescheduling.
>>> If you believe that you are done with current restriction even though
>>> you are not reaching to the end of restriction, you can specially say I'm
>>> done with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or
>>> tryClaim(restriction.getTo) if you are sure your end of restriction is not
>>> changed by any splitting).
>>> If you just want to re-process the rest of the restriction after a
>>> certain time, e.g, 5 mins, 30mins and so on, you need to implement the
>>> trySplit and return resume(duration) when you want to resume.
>>>
>>> Please note that the when an SDF is processing one element restriction
>>> pair, the start of the restriction is never changed. You will always get
>>> the same offset when you call currentRestriction().getFrom().
>>>
>>>
>>> [1]
>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>> [2]
>>> https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint
>>>
>>> On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <[email protected]>
>>> wrote:
>>>
>>>> > you can either never return resume() from your SDF or implement
>>>> suitable trySplit() logic for your RestrictionTracker
>>>>
>>>> Okay, is this documented anywhere? In particular,
>>>> https://s.apache.org/splittable-do-fn seems out of date, since it
>>>> implies resume() should be returned when tryClaim returns false.
>>>>
>>>> If this is the case, is there any way I can yield control to the
>>>> runtime if I have no new data to enable rescheduling? For example, can I
>>>> call tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>>>>
>>>> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> > IIUC, this should never happen as long as I return null to trySplit.
>>>>> Is this not the case? (trySplit implementation below)
>>>>>
>>>>> I noticed that in your implementation you return null for your
>>>>> RestrictionTracker.trySplit. That means you cannot return resume() from
>>>>> your SplittableDoFn.process() body since resume() means
>>>>> performing SplittableDoFn self-initiated checkpointing and
>>>>> deferring processing residuals.
>>>>>
>>>>> In your case, you can either never return resume() from your SDF or
>>>>> implement suitable trySplit() logic for your RestrictionTracker. For
>>>>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
>>>>> infinite restriction.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>>>>>
>>>>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> > Please note that your current restriction might be changed to a
>>>>>> finite restriction during processing one bundle if you do SplittableDoFn
>>>>>> self-initiated checkpointing or any runner issued splits
>>>>>>
>>>>>> IIUC, this should never happen as long as I return null to trySplit.
>>>>>> Is this not the case? (trySplit implementation below)
>>>>>>
>>>>>> @Override
>>>>>>   public SplitResult<OffsetRange> trySplit(double
>>>>>> fractionOfRemainder) {  return null; }
>>>>>>
>>>>>> > what will you do if we reach the finally block?
>>>>>>
>>>>>> At that point an exception is being thrown out of the processElement
>>>>>> function. The answer to that would be "what will the runtime do if an
>>>>>> exception is thrown out of the processElement function"
>>>>>>
>>>>>> > Open a WIP PR
>>>>>>
>>>>>> I have, but I'm staging changes in a separate repo. See
>>>>>> https://github.com/googleapis/java-pubsublite/pull/390 (although
>>>>>> this incorporates other changes, see PubsubLitePartitionSdf.java
>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>>>>>  and PubsubLiteOffsetRangeTracker.java
>>>>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>>>>>  for
>>>>>> the sdf and restriction tracker implementations)
>>>>>>
>>>>>> -Dan
>>>>>>
>>>>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Boyuan,
>>>>>>>>
>>>>>>>> Responses inline.
>>>>>>>>
>>>>>>>> > The checkDone is invoked by the SDK harness to guarantee that
>>>>>>>> when you exit you SplittableDoFn.process you must have completed all 
>>>>>>>> the
>>>>>>>> work in the current restriction
>>>>>>>>
>>>>>>>> This is impossible with unbounded restrictions since, by
>>>>>>>> definition, all work cannot be completed.
>>>>>>>>
>>>>>>> Please note that your current restriction might be changed to a
>>>>>>> finite restriction during processing one bundle if you do SplittableDoFn
>>>>>>> self-initiated checkpointing or any runner issued splits.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> > In your case, it seems like after you do tryClaim(3188439), you
>>>>>>>> return stop() directly from your SplittableDoFn.process function
>>>>>>>>
>>>>>>>> This is not true. The code in question is below. stop() is only
>>>>>>>> returned if tryClaim returns false.
>>>>>>>>
>>>>>>>> -Dan
>>>>>>>>
>>>>>>>> ```
>>>>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>>>>>>>
>>>>>>>       logger.atWarning().log("Failed to claim initial restriction
>>>>>>>> for partition " + partition);
>>>>>>>>       return ProcessContinuation.stop();
>>>>>>>>     }
>>>>>>>>     sleepTimeRemaining = maxSleepTime;
>>>>>>>>     Committer committer = committerSupplier.apply(partition);
>>>>>>>>     committer.startAsync().awaitRunning();
>>>>>>>>     try (PullSubscriber<SequencedMessage> subscriber =
>>>>>>>>         subscriberFactory.apply(partition,
>>>>>>>> Offset.of(tracker.currentRestriction().getFrom()))) {
>>>>>>>>       while (true) {
>>>>>>>>         List<SequencedMessage> messages = doPoll(subscriber);
>>>>>>>>         // We polled for as long as possible, yield to the runtime
>>>>>>>> to allow it to reschedule us on
>>>>>>>>         // a new task.
>>>>>>>>         if (messages.isEmpty()) {
>>>>>>>>           logger.atWarning().log("Yielding due to timeout on
>>>>>>>> partition " + partition);
>>>>>>>>           return ProcessContinuation.resume();
>>>>>>>>         }
>>>>>>>>         long lastOffset =
>>>>>>>> Iterables.getLast(messages).offset().value();
>>>>>>>>         if (tracker.tryClaim(lastOffset)) {
>>>>>>>>           messages.forEach(
>>>>>>>>               message ->
>>>>>>>>                   receiver.outputWithTimestamp(
>>>>>>>>                       message, new
>>>>>>>> Instant(Timestamps.toMillis(message.publishTime()))));
>>>>>>>>           committer.commitOffset(Offset.of(lastOffset + 1)).get();
>>>>>>>>         } else {
>>>>>>>>           logger.atWarning().log("Stopping partition " + partition);
>>>>>>>>           return ProcessContinuation.stop();
>>>>>>>>         }
>>>>>>>>       }
>>>>>>>>     } finally {
>>>>>>>>       committer.stopAsync().awaitTerminated();
>>>>>>>>     }
>>>>>>>> ```
>>>>>>>>
>>>>>>> From your code, what will you do if we reach the finally block?
>>>>>>> Would you like to open a WIP PR to show more details?
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Daniel,
>>>>>>>>>
>>>>>>>>> The checkDone is invoked by the SDK harness to guarantee that when
>>>>>>>>> you exit you SplittableDoFn.process(either you return stop() or 
>>>>>>>>> resume()),
>>>>>>>>> you must have completed all the work in the current restriction. This 
>>>>>>>>> is
>>>>>>>>> one of major ways for SplittableDoFn to prevent data loss.
>>>>>>>>>
>>>>>>>>> In your case, it seems like after you do tryClaim(3188439), you
>>>>>>>>> return stop() directly from your SplittableDoFn.process function. 
>>>>>>>>> That's
>>>>>>>>> not a correct way when working with restriction and restriction 
>>>>>>>>> tracker.
>>>>>>>>> You should either return resume() to perform SplittableDoFn initiated
>>>>>>>>> checkpoint to defer processing restriction [3188439, 
>>>>>>>>> 9223372036854775807),
>>>>>>>>> or you should return stop() only when you have tryClaim() return 
>>>>>>>>> False.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into
>>>>>>>>>> the following error on dataflow with a RestrictionTracker returning
>>>>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to 
>>>>>>>>>> `checkDone`,
>>>>>>>>>> but looking at the documentation of `checkDone`, I don't think 
>>>>>>>>>> there's any
>>>>>>>>>> rational thing I can do in this case. Does anyone know what should 
>>>>>>>>>> be done
>>>>>>>>>> for this method?
>>>>>>>>>>
>>>>>>>>>> The following exist in the RestrictionTracker javadoc:
>>>>>>>>>> -Must throw an exception with an informative error message, if
>>>>>>>>>> there is still any unclaimed work remaining in the restriction. 
>>>>>>>>>> (there is,
>>>>>>>>>> the restriction is unbounded)
>>>>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Dan
>>>>>>>>>>
>>>>>>>>>> "Error message from worker: java.lang.IllegalStateException: Last
>>>>>>>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807),
>>>>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted
>>>>>>>>>>
>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>>>>>>>
>>>>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>>>>>>>
>>>>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>>>>>>>
>>>>>>>>>

Reply via email to