> Okay, is this documented anywhere? In particular, https://s.apache.org/splittable-do-fn seems out of date, since it implies resume() should be returned when tryClaim returns false.
There is an update-to-date sdf programming guide[1] and typically [2] is talking about SDF initiated-checkpointing. And stop() should be returned when tryClaim returns false where resume() is expected to return when the restriction is not fully processed and you want to defer processing in the future. > If this is the case, is there any way I can yield control to the runtime if I have no new data to enable rescheduling? For example, can I call tracker.tryClaim(tracker.currentRestriction().getFrom()) ? It depends on the definition of no new data to enable rescheduling. If you believe that you are done with current restriction even though you are not reaching to the end of restriction, you can specially say I'm done with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or tryClaim(restriction.getTo) if you are sure your end of restriction is not changed by any splitting). If you just want to re-process the rest of the restriction after a certain time, e.g, 5 mins, 30mins and so on, you need to implement the trySplit and return resume(duration) when you want to resume. Please note that the when an SDF is processing one element restriction pair, the start of the restriction is never changed. You will always get the same offset when you call currentRestriction().getFrom(). [1] https://beam.apache.org/documentation/programming-guide/#splittable-dofns [2] https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <[email protected]> wrote: > > you can either never return resume() from your SDF or implement suitable > trySplit() logic for your RestrictionTracker > > Okay, is this documented anywhere? In particular, > https://s.apache.org/splittable-do-fn seems out of date, since it > implies resume() should be returned when tryClaim returns false. > > If this is the case, is there any way I can yield control to the runtime > if I have no new data to enable rescheduling? For example, can I call > tracker.tryClaim(tracker.currentRestriction().getFrom()) ? > > On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]> wrote: > >> > IIUC, this should never happen as long as I return null to trySplit. Is >> this not the case? (trySplit implementation below) >> >> I noticed that in your implementation you return null for your >> RestrictionTracker.trySplit. That means you cannot return resume() from >> your SplittableDoFn.process() body since resume() means >> performing SplittableDoFn self-initiated checkpointing and >> deferring processing residuals. >> >> In your case, you can either never return resume() from your SDF or >> implement suitable trySplit() logic for your RestrictionTracker. For >> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an >> infinite restriction. >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java >> >> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <[email protected]> >> wrote: >> >>> > Please note that your current restriction might be changed to a finite >>> restriction during processing one bundle if you do SplittableDoFn >>> self-initiated checkpointing or any runner issued splits >>> >>> IIUC, this should never happen as long as I return null to trySplit. Is >>> this not the case? (trySplit implementation below) >>> >>> @Override >>> public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) >>> { return null; } >>> >>> > what will you do if we reach the finally block? >>> >>> At that point an exception is being thrown out of the processElement >>> function. The answer to that would be "what will the runtime do if an >>> exception is thrown out of the processElement function" >>> >>> > Open a WIP PR >>> >>> I have, but I'm staging changes in a separate repo. See >>> https://github.com/googleapis/java-pubsublite/pull/390 (although this >>> incorporates other changes, see PubsubLitePartitionSdf.java >>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1> >>> and PubsubLiteOffsetRangeTracker.java >>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67> >>> for >>> the sdf and restriction tracker implementations) >>> >>> -Dan >>> >>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]> >>> wrote: >>> >>>> >>>> >>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]> >>>> wrote: >>>> >>>>> Hello Boyuan, >>>>> >>>>> Responses inline. >>>>> >>>>> > The checkDone is invoked by the SDK harness to guarantee that when >>>>> you exit you SplittableDoFn.process you must have completed all the work >>>>> in >>>>> the current restriction >>>>> >>>>> This is impossible with unbounded restrictions since, by definition, >>>>> all work cannot be completed. >>>>> >>>> Please note that your current restriction might be changed to a finite >>>> restriction during processing one bundle if you do SplittableDoFn >>>> self-initiated checkpointing or any runner issued splits. >>>> >>>> >>>>> >>>>> > In your case, it seems like after you do tryClaim(3188439), you >>>>> return stop() directly from your SplittableDoFn.process function >>>>> >>>>> This is not true. The code in question is below. stop() is only >>>>> returned if tryClaim returns false. >>>>> >>>>> -Dan >>>>> >>>>> ``` >>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { >>>>> >>>> logger.atWarning().log("Failed to claim initial restriction for >>>>> partition " + partition); >>>>> return ProcessContinuation.stop(); >>>>> } >>>>> sleepTimeRemaining = maxSleepTime; >>>>> Committer committer = committerSupplier.apply(partition); >>>>> committer.startAsync().awaitRunning(); >>>>> try (PullSubscriber<SequencedMessage> subscriber = >>>>> subscriberFactory.apply(partition, >>>>> Offset.of(tracker.currentRestriction().getFrom()))) { >>>>> while (true) { >>>>> List<SequencedMessage> messages = doPoll(subscriber); >>>>> // We polled for as long as possible, yield to the runtime to >>>>> allow it to reschedule us on >>>>> // a new task. >>>>> if (messages.isEmpty()) { >>>>> logger.atWarning().log("Yielding due to timeout on partition >>>>> " + partition); >>>>> return ProcessContinuation.resume(); >>>>> } >>>>> long lastOffset = Iterables.getLast(messages).offset().value(); >>>>> if (tracker.tryClaim(lastOffset)) { >>>>> messages.forEach( >>>>> message -> >>>>> receiver.outputWithTimestamp( >>>>> message, new >>>>> Instant(Timestamps.toMillis(message.publishTime())))); >>>>> committer.commitOffset(Offset.of(lastOffset + 1)).get(); >>>>> } else { >>>>> logger.atWarning().log("Stopping partition " + partition); >>>>> return ProcessContinuation.stop(); >>>>> } >>>>> } >>>>> } finally { >>>>> committer.stopAsync().awaitTerminated(); >>>>> } >>>>> ``` >>>>> >>>> From your code, what will you do if we reach the finally block? Would >>>> you like to open a WIP PR to show more details? >>>> >>>> >>>>> >>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Daniel, >>>>>> >>>>>> The checkDone is invoked by the SDK harness to guarantee that when >>>>>> you exit you SplittableDoFn.process(either you return stop() or >>>>>> resume()), >>>>>> you must have completed all the work in the current restriction. This is >>>>>> one of major ways for SplittableDoFn to prevent data loss. >>>>>> >>>>>> In your case, it seems like after you do tryClaim(3188439), you >>>>>> return stop() directly from your SplittableDoFn.process function. That's >>>>>> not a correct way when working with restriction and restriction tracker. >>>>>> You should either return resume() to perform SplittableDoFn initiated >>>>>> checkpoint to defer processing restriction [3188439, >>>>>> 9223372036854775807), >>>>>> or you should return stop() only when you have tryClaim() return False. >>>>>> >>>>>> >>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hello all, >>>>>>> >>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the >>>>>>> following error on dataflow with a RestrictionTracker returning >>>>>>> UNBOUNDED >>>>>>> to isBounded. It looks like calls are being made to `checkDone`, but >>>>>>> looking at the documentation of `checkDone`, I don't think there's any >>>>>>> rational thing I can do in this case. Does anyone know what should be >>>>>>> done >>>>>>> for this method? >>>>>>> >>>>>>> The following exist in the RestrictionTracker javadoc: >>>>>>> -Must throw an exception with an informative error message, if there >>>>>>> is still any unclaimed work remaining in the restriction. (there is, the >>>>>>> restriction is unbounded) >>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Dan >>>>>>> >>>>>>> "Error message from worker: java.lang.IllegalStateException: Last >>>>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807), >>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted >>>>>>> >>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >>>>>>> >>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117) >>>>>>> >>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226) >>>>>>> >>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)" >>>>>>> >>>>>>
