Hi Daniel,

The checkDone is invoked by the SDK harness to guarantee that when you exit
you SplittableDoFn.process(either you return stop() or resume()), you must
have completed all the work in the current restriction. This is one of
major ways for SplittableDoFn to prevent data loss.

In your case, it seems like after you do tryClaim(3188439), you return
stop() directly from your SplittableDoFn.process function. That's not a
correct way when working with restriction and restriction tracker. You
should either return resume() to perform SplittableDoFn initiated
checkpoint to defer processing restriction [3188439, 9223372036854775807),
or you should return stop() only when you have tryClaim() return False.


On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]> wrote:

> Hello all,
>
> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the
> following error on dataflow with a RestrictionTracker returning UNBOUNDED
> to isBounded. It looks like calls are being made to `checkDone`, but
> looking at the documentation of `checkDone`, I don't think there's any
> rational thing I can do in this case. Does anyone know what should be done
> for this method?
>
> The following exist in the RestrictionTracker javadoc:
> -Must throw an exception with an informative error message, if there is
> still any unclaimed work remaining in the restriction. (there is, the
> restriction is unbounded)
> -{@link RestrictionTracker#checkDone} MUST succeed
>
> Thanks,
>
> Dan
>
> "Error message from worker: java.lang.IllegalStateException: Last
> attempted offset was 3188438 in range [1998348, 9223372036854775807),
> claiming work in [3188439, 9223372036854775807) was not attempted
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>
> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>
> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>

Reply via email to