> you can either never return resume() from your SDF or implement suitable
trySplit() logic for your RestrictionTracker

Okay, is this documented anywhere? In particular,
https://s.apache.org/splittable-do-fn seems out of date, since it
implies resume() should be returned when tryClaim returns false.

If this is the case, is there any way I can yield control to the runtime if
I have no new data to enable rescheduling? For example, can I call
tracker.tryClaim(tracker.currentRestriction().getFrom()) ?

On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]> wrote:

> > IIUC, this should never happen as long as I return null to trySplit. Is
> this not the case? (trySplit implementation below)
>
> I noticed that in your implementation you return null for your
> RestrictionTracker.trySplit. That means you cannot return resume() from
> your SplittableDoFn.process() body since resume() means
> performing SplittableDoFn self-initiated checkpointing and
> deferring processing residuals.
>
> In your case, you can either never return resume() from your SDF or
> implement suitable trySplit() logic for your RestrictionTracker. For
> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
> infinite restriction.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>
> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <[email protected]>
> wrote:
>
>> > Please note that your current restriction might be changed to a finite
>> restriction during processing one bundle if you do SplittableDoFn
>> self-initiated checkpointing or any runner issued splits
>>
>> IIUC, this should never happen as long as I return null to trySplit. Is
>> this not the case? (trySplit implementation below)
>>
>> @Override
>>   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder)
>> {  return null; }
>>
>> > what will you do if we reach the finally block?
>>
>> At that point an exception is being thrown out of the processElement
>> function. The answer to that would be "what will the runtime do if an
>> exception is thrown out of the processElement function"
>>
>> > Open a WIP PR
>>
>> I have, but I'm staging changes in a separate repo. See
>> https://github.com/googleapis/java-pubsublite/pull/390 (although this
>> incorporates other changes, see PubsubLitePartitionSdf.java
>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>  and PubsubLiteOffsetRangeTracker.java
>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>  for
>> the sdf and restriction tracker implementations)
>>
>> -Dan
>>
>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]> wrote:
>>
>>>
>>>
>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]>
>>> wrote:
>>>
>>>> Hello Boyuan,
>>>>
>>>> Responses inline.
>>>>
>>>> > The checkDone is invoked by the SDK harness to guarantee that when
>>>> you exit you SplittableDoFn.process you must have completed all the work in
>>>> the current restriction
>>>>
>>>> This is impossible with unbounded restrictions since, by definition,
>>>> all work cannot be completed.
>>>>
>>> Please note that your current restriction might be changed to a finite
>>> restriction during processing one bundle if you do SplittableDoFn
>>> self-initiated checkpointing or any runner issued splits.
>>>
>>>
>>>>
>>>> > In your case, it seems like after you do tryClaim(3188439), you
>>>> return stop() directly from your SplittableDoFn.process function
>>>>
>>>> This is not true. The code in question is below. stop() is only
>>>> returned if tryClaim returns false.
>>>>
>>>> -Dan
>>>>
>>>> ```
>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>>>
>>>       logger.atWarning().log("Failed to claim initial restriction for
>>>> partition " + partition);
>>>>       return ProcessContinuation.stop();
>>>>     }
>>>>     sleepTimeRemaining = maxSleepTime;
>>>>     Committer committer = committerSupplier.apply(partition);
>>>>     committer.startAsync().awaitRunning();
>>>>     try (PullSubscriber<SequencedMessage> subscriber =
>>>>         subscriberFactory.apply(partition,
>>>> Offset.of(tracker.currentRestriction().getFrom()))) {
>>>>       while (true) {
>>>>         List<SequencedMessage> messages = doPoll(subscriber);
>>>>         // We polled for as long as possible, yield to the runtime to
>>>> allow it to reschedule us on
>>>>         // a new task.
>>>>         if (messages.isEmpty()) {
>>>>           logger.atWarning().log("Yielding due to timeout on partition
>>>> " + partition);
>>>>           return ProcessContinuation.resume();
>>>>         }
>>>>         long lastOffset = Iterables.getLast(messages).offset().value();
>>>>         if (tracker.tryClaim(lastOffset)) {
>>>>           messages.forEach(
>>>>               message ->
>>>>                   receiver.outputWithTimestamp(
>>>>                       message, new
>>>> Instant(Timestamps.toMillis(message.publishTime()))));
>>>>           committer.commitOffset(Offset.of(lastOffset + 1)).get();
>>>>         } else {
>>>>           logger.atWarning().log("Stopping partition " + partition);
>>>>           return ProcessContinuation.stop();
>>>>         }
>>>>       }
>>>>     } finally {
>>>>       committer.stopAsync().awaitTerminated();
>>>>     }
>>>> ```
>>>>
>>> From your code, what will you do if we reach the finally block? Would
>>> you like to open a WIP PR to show more details?
>>>
>>>
>>>>
>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Daniel,
>>>>>
>>>>> The checkDone is invoked by the SDK harness to guarantee that when you
>>>>> exit you SplittableDoFn.process(either you return stop() or resume()), you
>>>>> must have completed all the work in the current restriction. This is one 
>>>>> of
>>>>> major ways for SplittableDoFn to prevent data loss.
>>>>>
>>>>> In your case, it seems like after you do tryClaim(3188439), you return
>>>>> stop() directly from your SplittableDoFn.process function. That's not a
>>>>> correct way when working with restriction and restriction tracker. You
>>>>> should either return resume() to perform SplittableDoFn initiated
>>>>> checkpoint to defer processing restriction [3188439, 9223372036854775807),
>>>>> or you should return stop() only when you have tryClaim() return False.
>>>>>
>>>>>
>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the
>>>>>> following error on dataflow with a RestrictionTracker returning UNBOUNDED
>>>>>> to isBounded. It looks like calls are being made to `checkDone`, but
>>>>>> looking at the documentation of `checkDone`, I don't think there's any
>>>>>> rational thing I can do in this case. Does anyone know what should be 
>>>>>> done
>>>>>> for this method?
>>>>>>
>>>>>> The following exist in the RestrictionTracker javadoc:
>>>>>> -Must throw an exception with an informative error message, if there
>>>>>> is still any unclaimed work remaining in the restriction. (there is, the
>>>>>> restriction is unbounded)
>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Dan
>>>>>>
>>>>>> "Error message from worker: java.lang.IllegalStateException: Last
>>>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807),
>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted
>>>>>>
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>>>
>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>>>
>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>>>
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>>>
>>>>>

Reply via email to