DoFns are allowed to be non deterministic, so they don't have to yield the "same" output.
The example I'm thinking of is where users perform some "best-effort" deduplication by creating a hashmap in StartBundle and removing duplicates. This is usually done purely for performance to reduce shuffle size, as opposed to a guaranteed RemoveDuplicates. This scenario doesn't require FinishBundle, though it does require a StartBundle. On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles <k...@apache.org> wrote: > > > On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev <dev@beam.apache.org> > wrote: > >> Yes, not including FinishBundle in ParDoPayload seems like a mistake. >> Though absence of FinishBundle doesn't mean that one can assume that >> elements in a bundle don't affect subsequent bundle elements (i.e. there >> might still be caching!) >> > > Well for a DoFn to be correct, it has to yield the same (or "the same as > much as the user expects it to be the same") output regardless of order of > processing or bundling so a runner or SDK harness can definitely take a > bunch of elements and process them however it wants if there's > no @FinishBundle. I think that's what Jan is getting at - adding > a @FinishBundle is the user placing a new restriction on the runner. > Technically probably have to include @StartBundle in that consideration. > > Kenn > > >> >> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> >>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Kenn and Reuven, >>>> >>>> I agree with all these points. The only issue here seems to be that >>>> FlinkRunner does not fulfill these constraints. This is a bug that can be >>>> fixed, though we need to change some defaults, as 1000 ms default bundle >>>> "duration" for lower traffic Pipelines can be too much. We are also >>>> probably missing some @ValidatesReunner tests for this. I created [1] and >>>> [2] to track this. >>>> >>>> One question still remains, the bundle vs. element life-cycle is >>>> relevant only for cases where processing of element X can affect processing >>>> of element Y later in the same bundle. Once this influence is rules out >>>> (i.e. no caching), this information can result in runner optimization that >>>> yields better performance. Should we consider propagate this information >>>> from user code to the runner? >>>> >>> Yes! >>> >>> This was the explicit goal of the move to annotation-driven DoFn in >>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner >>> can get good information about what the DoFn requirements are. >>> >>> When there is no @FinishBundle method, the runner can make additional >>> optimizations. This should have been included in the ParDoPayload in the >>> proto when we moved to portable pipelines. I cannot remember if there was a >>> good reason that we did not do so. Maybe we (incorrectly) thought that this >>> was an issue that only the Java SDK harness needed to know about. >>> >>> Kenn >>> >>> >>>> [1] https://github.com/apache/beam/issues/28649 >>>> >>>> [2] https://github.com/apache/beam/issues/28650 >>>> On 9/25/23 18:31, Reuven Lax via dev wrote: >>>> >>>> >>>> >>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> >>>>> On 9/23/23 18:16, Reuven Lax via dev wrote: >>>>> >>>>> Two separate things here: >>>>> >>>>> 1. Yes, a watermark can update in the middle of a bundle. >>>>> 2. The records in the bundle themselves will prevent the watermark >>>>> from updating as they are still in flight until after finish bundle. >>>>> Therefore simply caching the records should always be watermark safe, >>>>> regardless of the runner. You will only run into problems if you try and >>>>> move timestamps "backwards" - which is why Beam strongly discourages this. >>>>> >>>>> This is not aligned with FlinkRunner's implementation. And I actually >>>>> think it is not aligned conceptually. As mentioned, Flink does not have >>>>> the concept of bundles at all. It achieves fault tolerance via >>>>> checkpointing, essentially checkpoint barrier flowing from sources to >>>>> sinks, safely snapshotting state of each operator on the way. Bundles are >>>>> implemented as a somewhat arbitrary set of elements between two >>>>> consecutive >>>>> checkpoints (there can be multiple bundles between checkpoints). A bundle >>>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only >>>>> after the checkpoint barrier passes over the elements in the bundle (every >>>>> bundle is finished at the very latest exactly before a checkpoint). But >>>>> watermark propagation and bundle finalization is completely unrelated. >>>>> This >>>>> might be a bug in the runner, but requiring checkpoint for watermark >>>>> propagation will introduce insane delays between processing time and >>>>> watermarks, every executable stage will delay watermark propagation until >>>>> a >>>>> checkpoint (which is typically the order of seconds). This delay would add >>>>> up after each stage. >>>>> >>>> >>>> It's not bundles that hold up processing, rather it is elements, and >>>> elements are not considered "processed" until FinishBundle. >>>> >>>> You are right about Flink. In many cases this is fine - if Flink rolls >>>> back to the last checkpoint, the watermark will also roll back, and >>>> everything stays consistent. So in general, one does not need to wait for >>>> checkpoints for watermark propagation. >>>> >>>> Where things get a bit weirder with Flink is whenever one has external >>>> side effects. In theory, one should wait for checkpoints before letting a >>>> Sink flush, otherwise one could end up with incorrect outputs (especially >>>> with a sink like TextIO). Flink itself recognizes this, and that's why they >>>> provide TwoPhaseCommitSinkFunction >>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html> >>>> which >>>> waits for a checkpoint. In Beam, this is the reason we introduced >>>> RequiresStableInput. Of course in practice many Flink users don't do this - >>>> in which case they are prioritizing latency over data correctness. >>>> >>>>> >>>>> Reuven >>>>> >>>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is >>>>>> committed, as there's no guarantee that this work won't be discarded. >>>>>> >>>>>> There was a thread [1], where the conclusion seemed to be that >>>>>> updating watermark is possible even in the middle of a bundle. Actually, >>>>>> handling watermarks is runner-dependent (e.g. Flink does not store >>>>>> watermarks in checkpoints, they are always recomputed from scratch on >>>>>> restore). >>>>>> >>>>>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv >>>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote: >>>>>> >>>>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> >>>>>> wrote: >>>>>> >>>>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote: >>>>>>> >>>>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev < >>>>>>> dev@beam.apache.org> wrote: >>>>>>> >>>>>>>> I've actually wondered about this specifically for streaming... if >>>>>>>> you're writing a pipeline there it seems like you're often going to >>>>>>>> want to >>>>>>>> put high fixed cost things like database connections even outside of >>>>>>>> the >>>>>>>> bundle setup. You really only want to do that once in the lifetime of >>>>>>>> the >>>>>>>> worker itself, not the bundle. Seems like having that boundary be >>>>>>>> somewhere >>>>>>>> other than an arbitrarily (and probably small in streaming to avoid >>>>>>>> latency) group of elements might be more useful? I suppose this depends >>>>>>>> heavily on the object lifecycle in the sdk worker though. >>>>>>>> >>>>>>> >>>>>>> +1. This is the difference between @Setup and @StartBundle. The >>>>>>> start/finish bundle operations should be used for bracketing element >>>>>>> processing that must be committed as a unit for correct failure recovery >>>>>>> (e.g. if elements are cached in ProcessElement, they should all be >>>>>>> emitted >>>>>>> in FinishBundle). On the other hand, things like open database >>>>>>> connections >>>>>>> can and likely should be shared across bundles. >>>>>>> >>>>>>> This is correct, but the caching between @StartBundle and >>>>>>> @FinishBundle has some problems. First, users need to manually set >>>>>>> watermark hold for min(timestamp in bundle), otherwise watermark might >>>>>>> overtake the buffered elements. >>>>>>> >>>>>> >>>>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is >>>>>> committed, as there's no guarantee that this work won't be discarded. >>>>>> >>>>>> >>>>>>> Users don't have other option than using timer.withOutputTimestamp >>>>>>> for that, as we don't have a user-facing API to set watermark hold >>>>>>> otherwise, thus the in-bundle caching implies stateful DoFn. The >>>>>>> question >>>>>>> might then by, why not use "classical" stateful caching involving >>>>>>> state, as >>>>>>> there is full control over the caching in user code. This triggered me >>>>>>> an >>>>>>> idea if it would be useful to add the information about caching to the >>>>>>> API >>>>>>> (e.g. in Java @StartBundle(caching=true)), which could solve the above >>>>>>> issues maybe (runner would know to set the hold, it could work with >>>>>>> "stateless" DoFns)? >>>>>>> >>>>>> >>>>>> Really, this is one of the areas that the streaming/batch abstraction >>>>>> leaks. In batch it was a common pattern to have local DoFn instance state >>>>>> that persisted from start to finish bundle, and these were also used as >>>>>> convenient entry points for other operations (like opening >>>>>> database connections) 'cause bundles were often "as large as possible." >>>>>> WIth the advent of n streaming it makes sense to put this in >>>>>> explicitly managed runner state to allow for cross-bundle amortization >>>>>> and >>>>>> there's more value in distinguishing between @Setup and @StartBundle. >>>>>> >>>>>> (Were I do to things over I'd probably encourage an API that >>>>>> discouraged non-configuration instance state on DoFns altogether, e.g. in >>>>>> the notion of Python context managers (and an equivalent API could >>>>>> probably >>>>>> be put together with AutoClosables in Java) one would have something like >>>>>> >>>>>> ParDo(X) >>>>>> >>>>>> which would logically (though not necessarily physically) lead to an >>>>>> execution like >>>>>> >>>>>> with X.bundle_processor() as bundle_processor: >>>>>> for bundle in bundles: >>>>>> with bundle_processor.element_processor() as process: >>>>>> for element in bundle: >>>>>> process(element) >>>>>> >>>>>> where the traditional setup/start_bundle/finish_bundle/teardown logic >>>>>> would live in the __enter__ and __exit__ methods (made even easier with >>>>>> coroutines.) For convenience one could of course provide a raw bundle >>>>>> processor or element processor to ParDo if the enter/exit contexts are >>>>>> trivial. But this is getting somewhat off-topic... >>>>>> >>>>>> >>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> B >>>>>>>> >>>>>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> (I notice that you replied only to yourself, but there has been a >>>>>>>>> whole thread of discussion on this - are you subscribed to dev@beam? >>>>>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd) >>>>>>>>> >>>>>>>>> It sounds like you want what everyone wants: to have the biggest >>>>>>>>> bundles possible. >>>>>>>>> >>>>>>>>> So for bounded data, basically you make even splits of the data >>>>>>>>> and each split is one bundle. And then dynamic splitting to >>>>>>>>> redistribute >>>>>>>>> work to eliminate stragglers, if your engine has that capability. >>>>>>>>> >>>>>>>>> For unbounded data, you more-or-less bundle as much as you can >>>>>>>>> without waiting too long, like Jan described. >>>>>>>>> >>>>>>>>> Users know to put their high fixed costs in @StartBundle and then >>>>>>>>> it is the runner's job to put as many calls to @ProcessElement as >>>>>>>>> possible >>>>>>>>> to amortize. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran < >>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>> >>>>>>>>>> Whoops, I typoed my last email. I meant to write "this isn't the >>>>>>>>>> greatest strategy for high *fixed* cost transforms", e.g. a >>>>>>>>>> transform that takes 5 minutes to get set up and then maybe a >>>>>>>>>> microsecond >>>>>>>>>> per input >>>>>>>>>> >>>>>>>>>> I suppose one solution is to move the responsibility for handling >>>>>>>>>> this kind of situation to the user and expect users to use a bundling >>>>>>>>>> transform (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. >>>>>>>>>> Is this >>>>>>>>>> what other runners expect? Just want to make sure I'm not missing >>>>>>>>>> some >>>>>>>>>> smart generic bundling strategy that might handle this for users. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran < >>>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>>> >>>>>>>>>>> Writing a runner and the first strategy for determining bundling >>>>>>>>>>> size was to just start with a bundle size of one and double it >>>>>>>>>>> until we >>>>>>>>>>> reach a size that we expect to take some targets per-bundle runtime >>>>>>>>>>> (e.g. >>>>>>>>>>> maybe 10 minutes). I realize that this isn't the greatest strategy >>>>>>>>>>> for high >>>>>>>>>>> sized cost transforms. I'm curious what kind of strategies other >>>>>>>>>>> runners >>>>>>>>>>> take? >>>>>>>>>>> >>>>>>>>>>