> There is an update-to-date sdf programming guide[1] and typically [2] is talking about SDF initiated-checkpointing
To the extent I can see, this never mentions the restriction that you <need> to implement a split() that returns a bounded restriction if returning resume() from an SDF. Nor does this restriction particularly make sense if the range being processed is itself unbounded? Perhaps you would consider not calling checkDone() on resume() if the restriction provided to the runner is unbounded since it would be unreasonable to complete an unbounded restriction? > It depends on the definition of no new data to enable rescheduling. Perhaps it would be better to explain in terms of why I'm trying to do this. If the subscription has not received any data in a while, or is receiving data infrequently, I want to enable dataflow to scale down to 1 worker, but there will be no need to call "tryClaim" if there is no new data from Pub/Sub Lite. All I want to do is, if data is arriving infrequently, give dataflow the opportunity to scale my job down. I'm not actually done with the data because, as new data can always be published in the future, we can't know that, and I'm trying to avoid needing to implement bounded reads to artificially produce sub-windows when an unbounded output is much more natural. > Please note that the when an SDF is processing one element restriction pair, the start of the restriction is never changed. You will always get the same offset when you call currentRestriction().getFrom(). This is good to know. So to rephrase, could I periodically call tryClaim(<last sent message or getFrom if none yet>) to yield control back to the runtime? -Dan On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang <[email protected]> wrote: > > Okay, is this documented anywhere? In particular, > https://s.apache.org/splittable-do-fn seems out of date, since it > implies resume() should be returned when tryClaim returns false. > > There is an update-to-date sdf programming guide[1] and typically [2] is > talking about SDF initiated-checkpointing. And stop() should be returned > when tryClaim returns false where resume() is expected to return when the > restriction is not fully processed and you want to defer processing in the > future. > > > If this is the case, is there any way I can yield control to the runtime > if I have no new data to enable rescheduling? For example, can I call > tracker.tryClaim(tracker.currentRestriction().getFrom()) ? > > It depends on the definition of no new data to enable rescheduling. > If you believe that you are done with current restriction even though you > are not reaching to the end of restriction, you can specially say I'm done > with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or > tryClaim(restriction.getTo) if you are sure your end of restriction is not > changed by any splitting). > If you just want to re-process the rest of the restriction after a certain > time, e.g, 5 mins, 30mins and so on, you need to implement the trySplit and > return resume(duration) when you want to resume. > > Please note that the when an SDF is processing one element restriction > pair, the start of the restriction is never changed. You will always get > the same offset when you call currentRestriction().getFrom(). > > > [1] > https://beam.apache.org/documentation/programming-guide/#splittable-dofns > [2] > https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint > > On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins <[email protected]> > wrote: > >> > you can either never return resume() from your SDF or implement >> suitable trySplit() logic for your RestrictionTracker >> >> Okay, is this documented anywhere? In particular, >> https://s.apache.org/splittable-do-fn seems out of date, since it >> implies resume() should be returned when tryClaim returns false. >> >> If this is the case, is there any way I can yield control to the runtime >> if I have no new data to enable rescheduling? For example, can I call >> tracker.tryClaim(tracker.currentRestriction().getFrom()) ? >> >> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang <[email protected]> wrote: >> >>> > IIUC, this should never happen as long as I return null to trySplit. >>> Is this not the case? (trySplit implementation below) >>> >>> I noticed that in your implementation you return null for your >>> RestrictionTracker.trySplit. That means you cannot return resume() from >>> your SplittableDoFn.process() body since resume() means >>> performing SplittableDoFn self-initiated checkpointing and >>> deferring processing residuals. >>> >>> In your case, you can either never return resume() from your SDF or >>> implement suitable trySplit() logic for your RestrictionTracker. For >>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an >>> infinite restriction. >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java >>> >>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins <[email protected]> >>> wrote: >>> >>>> > Please note that your current restriction might be changed to a >>>> finite restriction during processing one bundle if you do SplittableDoFn >>>> self-initiated checkpointing or any runner issued splits >>>> >>>> IIUC, this should never happen as long as I return null to trySplit. Is >>>> this not the case? (trySplit implementation below) >>>> >>>> @Override >>>> public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) >>>> { return null; } >>>> >>>> > what will you do if we reach the finally block? >>>> >>>> At that point an exception is being thrown out of the processElement >>>> function. The answer to that would be "what will the runtime do if an >>>> exception is thrown out of the processElement function" >>>> >>>> > Open a WIP PR >>>> >>>> I have, but I'm staging changes in a separate repo. See >>>> https://github.com/googleapis/java-pubsublite/pull/390 (although this >>>> incorporates other changes, see PubsubLitePartitionSdf.java >>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1> >>>> and PubsubLiteOffsetRangeTracker.java >>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67> >>>> for >>>> the sdf and restriction tracker implementations) >>>> >>>> -Dan >>>> >>>> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins <[email protected]> >>>>> wrote: >>>>> >>>>>> Hello Boyuan, >>>>>> >>>>>> Responses inline. >>>>>> >>>>>> > The checkDone is invoked by the SDK harness to guarantee that when >>>>>> you exit you SplittableDoFn.process you must have completed all the work >>>>>> in >>>>>> the current restriction >>>>>> >>>>>> This is impossible with unbounded restrictions since, by definition, >>>>>> all work cannot be completed. >>>>>> >>>>> Please note that your current restriction might be changed to a finite >>>>> restriction during processing one bundle if you do SplittableDoFn >>>>> self-initiated checkpointing or any runner issued splits. >>>>> >>>>> >>>>>> >>>>>> > In your case, it seems like after you do tryClaim(3188439), you >>>>>> return stop() directly from your SplittableDoFn.process function >>>>>> >>>>>> This is not true. The code in question is below. stop() is only >>>>>> returned if tryClaim returns false. >>>>>> >>>>>> -Dan >>>>>> >>>>>> ``` >>>>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { >>>>>> >>>>> logger.atWarning().log("Failed to claim initial restriction for >>>>>> partition " + partition); >>>>>> return ProcessContinuation.stop(); >>>>>> } >>>>>> sleepTimeRemaining = maxSleepTime; >>>>>> Committer committer = committerSupplier.apply(partition); >>>>>> committer.startAsync().awaitRunning(); >>>>>> try (PullSubscriber<SequencedMessage> subscriber = >>>>>> subscriberFactory.apply(partition, >>>>>> Offset.of(tracker.currentRestriction().getFrom()))) { >>>>>> while (true) { >>>>>> List<SequencedMessage> messages = doPoll(subscriber); >>>>>> // We polled for as long as possible, yield to the runtime to >>>>>> allow it to reschedule us on >>>>>> // a new task. >>>>>> if (messages.isEmpty()) { >>>>>> logger.atWarning().log("Yielding due to timeout on >>>>>> partition " + partition); >>>>>> return ProcessContinuation.resume(); >>>>>> } >>>>>> long lastOffset = >>>>>> Iterables.getLast(messages).offset().value(); >>>>>> if (tracker.tryClaim(lastOffset)) { >>>>>> messages.forEach( >>>>>> message -> >>>>>> receiver.outputWithTimestamp( >>>>>> message, new >>>>>> Instant(Timestamps.toMillis(message.publishTime())))); >>>>>> committer.commitOffset(Offset.of(lastOffset + 1)).get(); >>>>>> } else { >>>>>> logger.atWarning().log("Stopping partition " + partition); >>>>>> return ProcessContinuation.stop(); >>>>>> } >>>>>> } >>>>>> } finally { >>>>>> committer.stopAsync().awaitTerminated(); >>>>>> } >>>>>> ``` >>>>>> >>>>> From your code, what will you do if we reach the finally block? Would >>>>> you like to open a WIP PR to show more details? >>>>> >>>>> >>>>>> >>>>>> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Daniel, >>>>>>> >>>>>>> The checkDone is invoked by the SDK harness to guarantee that when >>>>>>> you exit you SplittableDoFn.process(either you return stop() or >>>>>>> resume()), >>>>>>> you must have completed all the work in the current restriction. This is >>>>>>> one of major ways for SplittableDoFn to prevent data loss. >>>>>>> >>>>>>> In your case, it seems like after you do tryClaim(3188439), you >>>>>>> return stop() directly from your SplittableDoFn.process function. That's >>>>>>> not a correct way when working with restriction and restriction tracker. >>>>>>> You should either return resume() to perform SplittableDoFn initiated >>>>>>> checkpoint to defer processing restriction [3188439, >>>>>>> 9223372036854775807), >>>>>>> or you should return stop() only when you have tryClaim() return False. >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello all, >>>>>>>> >>>>>>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into >>>>>>>> the following error on dataflow with a RestrictionTracker returning >>>>>>>> UNBOUNDED to isBounded. It looks like calls are being made to >>>>>>>> `checkDone`, >>>>>>>> but looking at the documentation of `checkDone`, I don't think there's >>>>>>>> any >>>>>>>> rational thing I can do in this case. Does anyone know what should be >>>>>>>> done >>>>>>>> for this method? >>>>>>>> >>>>>>>> The following exist in the RestrictionTracker javadoc: >>>>>>>> -Must throw an exception with an informative error message, if >>>>>>>> there is still any unclaimed work remaining in the restriction. (there >>>>>>>> is, >>>>>>>> the restriction is unbounded) >>>>>>>> -{@link RestrictionTracker#checkDone} MUST succeed >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Dan >>>>>>>> >>>>>>>> "Error message from worker: java.lang.IllegalStateException: Last >>>>>>>> attempted offset was 3188438 in range [1998348, 9223372036854775807), >>>>>>>> claiming work in [3188439, 9223372036854775807) was not attempted >>>>>>>> >>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862) >>>>>>>> >>>>>>>> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117) >>>>>>>> >>>>>>>> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226) >>>>>>>> >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)" >>>>>>>> >>>>>>>
