> Please note that your current restriction might be changed to a finite
restriction during processing one bundle if you do SplittableDoFn
self-initiated checkpointing or any runner issued splits
IIUC, this should never happen as long as I return null to trySplit. Is
this not the case? (trySplit implementation below)
@Override
public SplitResult<OffsetRange> trySplit(double fractionOfRemainder)
{ return null; }
> what will you do if we reach the finally block?
At that point an exception is being thrown out of the processElement
function. The answer to that would be "what will the runtime do if an
exception is thrown out of the processElement function"
> Open a WIP PR
I have, but I'm staging changes in a separate repo. See
https://github.com/googleapis/java-pubsublite/pull/390 (although this
incorporates other changes, see PubsubLitePartitionSdf.java
<https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
and PubsubLiteOffsetRangeTracker.java
<https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
for
the sdf and restriction tracker implementations)
-Dan
On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]> wrote:
>
>
> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]>
> wrote:
>
>> Hello Boyuan,
>>
>> Responses inline.
>>
>> > The checkDone is invoked by the SDK harness to guarantee that when you
>> exit you SplittableDoFn.process you must have completed all the work in the
>> current restriction
>>
>> This is impossible with unbounded restrictions since, by definition, all
>> work cannot be completed.
>>
> Please note that your current restriction might be changed to a finite
> restriction during processing one bundle if you do SplittableDoFn
> self-initiated checkpointing or any runner issued splits.
>
>
>>
>> > In your case, it seems like after you do tryClaim(3188439), you return
>> stop() directly from your SplittableDoFn.process function
>>
>> This is not true. The code in question is below. stop() is only returned
>> if tryClaim returns false.
>>
>> -Dan
>>
>> ```
>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>
> logger.atWarning().log("Failed to claim initial restriction for
>> partition " + partition);
>> return ProcessContinuation.stop();
>> }
>> sleepTimeRemaining = maxSleepTime;
>> Committer committer = committerSupplier.apply(partition);
>> committer.startAsync().awaitRunning();
>> try (PullSubscriber<SequencedMessage> subscriber =
>> subscriberFactory.apply(partition,
>> Offset.of(tracker.currentRestriction().getFrom()))) {
>> while (true) {
>> List<SequencedMessage> messages = doPoll(subscriber);
>> // We polled for as long as possible, yield to the runtime to
>> allow it to reschedule us on
>> // a new task.
>> if (messages.isEmpty()) {
>> logger.atWarning().log("Yielding due to timeout on partition "
>> + partition);
>> return ProcessContinuation.resume();
>> }
>> long lastOffset = Iterables.getLast(messages).offset().value();
>> if (tracker.tryClaim(lastOffset)) {
>> messages.forEach(
>> message ->
>> receiver.outputWithTimestamp(
>> message, new
>> Instant(Timestamps.toMillis(message.publishTime()))));
>> committer.commitOffset(Offset.of(lastOffset + 1)).get();
>> } else {
>> logger.atWarning().log("Stopping partition " + partition);
>> return ProcessContinuation.stop();
>> }
>> }
>> } finally {
>> committer.stopAsync().awaitTerminated();
>> }
>> ```
>>
> From your code, what will you do if we reach the finally block? Would you
> like to open a WIP PR to show more details?
>
>
>>
>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]> wrote:
>>
>>> Hi Daniel,
>>>
>>> The checkDone is invoked by the SDK harness to guarantee that when you
>>> exit you SplittableDoFn.process(either you return stop() or resume()), you
>>> must have completed all the work in the current restriction. This is one of
>>> major ways for SplittableDoFn to prevent data loss.
>>>
>>> In your case, it seems like after you do tryClaim(3188439), you return
>>> stop() directly from your SplittableDoFn.process function. That's not a
>>> correct way when working with restriction and restriction tracker. You
>>> should either return resume() to perform SplittableDoFn initiated
>>> checkpoint to defer processing restriction [3188439, 9223372036854775807),
>>> or you should return stop() only when you have tryClaim() return False.
>>>
>>>
>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]>
>>> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the
>>>> following error on dataflow with a RestrictionTracker returning UNBOUNDED
>>>> to isBounded. It looks like calls are being made to `checkDone`, but
>>>> looking at the documentation of `checkDone`, I don't think there's any
>>>> rational thing I can do in this case. Does anyone know what should be done
>>>> for this method?
>>>>
>>>> The following exist in the RestrictionTracker javadoc:
>>>> -Must throw an exception with an informative error message, if there is
>>>> still any unclaimed work remaining in the restriction. (there is, the
>>>> restriction is unbounded)
>>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>>
>>>> Thanks,
>>>>
>>>> Dan
>>>>
>>>> "Error message from worker: java.lang.IllegalStateException: Last
>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807),
>>>> claiming work in [3188439, 9223372036854775807) was not attempted
>>>>
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>>>
>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>>>>
>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>>>>
>>>