DoFns are allowed to be non deterministic, so they don't have to yield the
"same" output.

The example I'm thinking of is where users perform some "best-effort"
deduplication by creating a hashmap in StartBundle and removing duplicates.
This is usually done purely for performance to reduce shuffle size, as
opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
FinishBundle, though it does require a StartBundle.

On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles <> wrote:

> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev <>
> wrote:
>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>> Though absence of FinishBundle doesn't mean that one can assume that
>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>> might still be caching!)
> Well for a DoFn to be correct, it has to yield the same (or "the same as
> much as the user expects it to be the same") output regardless of order of
> processing or bundling so a runner or SDK harness can definitely take a
> bunch of elements and process them however it wants if there's
> no @FinishBundle. I think that's what Jan is getting at - adding
> a @FinishBundle is the user placing a new restriction on the runner.
> Technically probably have to include @StartBundle in that consideration.
> Kenn
>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles <> wrote:
>>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský <> wrote:
>>>> Hi Kenn and Reuven,
>>>> I agree with all these points. The only issue here seems to be that
>>>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>>>> fixed, though we need to change some defaults, as 1000 ms default bundle
>>>> "duration" for lower traffic Pipelines can be too much. We are also
>>>> probably missing some @ValidatesReunner tests for this. I created [1] and
>>>> [2] to track this.
>>>> One question still remains, the bundle vs. element life-cycle is
>>>> relevant only for cases where processing of element X can affect processing
>>>> of element Y later in the same bundle. Once this influence is rules out
>>>> (i.e. no caching), this information can result in runner optimization that
>>>> yields better performance. Should we consider propagate this information
>>>> from user code to the runner?
>>> Yes!
>>> This was the explicit goal of the move to annotation-driven DoFn in
>>> to make it so that the SDK and runner
>>> can get good information about what the DoFn requirements are.
>>> When there is no @FinishBundle method, the runner can make additional
>>> optimizations. This should have been included in the ParDoPayload in the
>>> proto when we moved to portable pipelines. I cannot remember if there was a
>>> good reason that we did not do so. Maybe we (incorrectly) thought that this
>>> was an issue that only the Java SDK harness needed to know about.
>>> Kenn
>>>> [1]
>>>> [2]
>>>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <> wrote:
>>>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>>> Two separate things here:
>>>>> 1. Yes, a watermark can update in the middle of a bundle.
>>>>> 2. The records in the bundle themselves will prevent the watermark
>>>>> from updating as they are still in flight until after finish bundle.
>>>>> Therefore simply caching the records should always be watermark safe,
>>>>> regardless of the runner. You will only run into problems if you try and
>>>>> move timestamps "backwards" - which is why Beam strongly discourages this.
>>>>> This is not aligned with  FlinkRunner's implementation. And I actually
>>>>> think it is not aligned conceptually.  As mentioned, Flink does not have
>>>>> the concept of bundles at all. It achieves fault tolerance via
>>>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>>>> sinks, safely snapshotting state of each operator on the way. Bundles are
>>>>> implemented as a somewhat arbitrary set of elements between two 
>>>>> consecutive
>>>>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>>>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
>>>>> after the checkpoint barrier passes over the elements in the bundle (every
>>>>> bundle is finished at the very latest exactly before a checkpoint). But
>>>>> watermark propagation and bundle finalization is completely unrelated. 
>>>>> This
>>>>> might be a bug in the runner, but requiring checkpoint for watermark
>>>>> propagation will introduce insane delays between processing time and
>>>>> watermarks, every executable stage will delay watermark propagation until 
>>>>> a
>>>>> checkpoint (which is typically the order of seconds). This delay would add
>>>>> up after each stage.
>>>> It's not bundles that hold up processing, rather it is elements, and
>>>> elements are not considered "processed" until FinishBundle.
>>>> You are right about Flink. In many cases this is fine - if Flink rolls
>>>> back to the last checkpoint, the watermark will also roll back, and
>>>> everything stays consistent. So in general, one does not need to wait for
>>>> checkpoints for watermark propagation.
>>>> Where things get a bit weirder with Flink is whenever one has external
>>>> side effects. In theory, one should wait for checkpoints before letting a
>>>> Sink flush, otherwise one could end up with incorrect outputs (especially
>>>> with a sink like TextIO). Flink itself recognizes this, and that's why they
>>>> provide TwoPhaseCommitSinkFunction
>>>> <>
>>>>  which
>>>> waits for a checkpoint. In Beam, this is the reason we introduced
>>>> RequiresStableInput. Of course in practice many Flink users don't do this -
>>>> in which case they are prioritizing latency over data correctness.
>>>>> Reuven
>>>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <> wrote:
>>>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>> There was a thread [1], where the conclusion seemed to be that
>>>>>> updating watermark is possible even in the middle of a bundle. Actually,
>>>>>> handling watermarks is runner-dependent (e.g. Flink does not store
>>>>>> watermarks in checkpoints, they are always recomputed from scratch on
>>>>>> restore).
>>>>>> [1]
>>>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>>>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <>
>>>>>> wrote:
>>>>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <
>>>>>>>> wrote:
>>>>>>>> I've actually wondered about this specifically for streaming... if
>>>>>>>> you're writing a pipeline there it seems like you're often going to 
>>>>>>>> want to
>>>>>>>> put high fixed cost things like database connections even outside of 
>>>>>>>> the
>>>>>>>> bundle setup. You really only want to do that once in the lifetime of 
>>>>>>>> the
>>>>>>>> worker itself, not the bundle. Seems like having that boundary be 
>>>>>>>> somewhere
>>>>>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>>>>>> latency) group of elements might be more useful? I suppose this depends
>>>>>>>> heavily on the object lifecycle in the sdk worker though.
>>>>>>> +1. This is the difference between @Setup and @StartBundle. The
>>>>>>> start/finish bundle operations should be used for bracketing element
>>>>>>> processing that must be committed as a unit for correct failure recovery
>>>>>>> (e.g. if elements are cached in ProcessElement, they should all be 
>>>>>>> emitted
>>>>>>> in FinishBundle). On the other hand, things like open database 
>>>>>>> connections
>>>>>>> can and likely should be shared across bundles.
>>>>>>> This is correct, but the caching between @StartBundle and
>>>>>>> @FinishBundle has some problems. First, users need to manually set
>>>>>>> watermark hold for min(timestamp in bundle), otherwise watermark might
>>>>>>> overtake the buffered elements.
>>>>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>>> Users don't have other option than using timer.withOutputTimestamp
>>>>>>> for that, as we don't have a user-facing API to set watermark hold
>>>>>>> otherwise, thus the in-bundle caching implies stateful DoFn. The 
>>>>>>> question
>>>>>>> might then by, why not use "classical" stateful caching involving 
>>>>>>> state, as
>>>>>>> there is full control over the caching in user code. This triggered me 
>>>>>>> an
>>>>>>> idea if it would be useful to add the information about caching to the 
>>>>>>> API
>>>>>>> (e.g. in Java @StartBundle(caching=true)), which could solve the above
>>>>>>> issues maybe (runner would know to set the hold, it could work with
>>>>>>> "stateless" DoFns)?
>>>>>> Really, this is one of the areas that the streaming/batch abstraction
>>>>>> leaks. In batch it was a common pattern to have local DoFn instance state
>>>>>> that persisted from start to finish bundle, and these were also used as
>>>>>> convenient entry points for other operations (like opening
>>>>>> database connections) 'cause bundles were often "as large as possible."
>>>>>> WIth the advent of n streaming it makes sense to put this in
>>>>>> explicitly managed runner state to allow for cross-bundle amortization 
>>>>>> and
>>>>>> there's more value in distinguishing between @Setup and @StartBundle.
>>>>>> (Were I do to things over I'd probably encourage an API that
>>>>>> discouraged non-configuration instance state on DoFns altogether, e.g. in
>>>>>> the notion of Python context managers (and an equivalent API could 
>>>>>> probably
>>>>>> be put together with AutoClosables in Java) one would have something like
>>>>>> ParDo(X)
>>>>>> which would logically (though not necessarily physically) lead to an
>>>>>> execution like
>>>>>> with X.bundle_processor() as bundle_processor:
>>>>>>   for bundle in bundles:
>>>>>>     with bundle_processor.element_processor() as process:
>>>>>>       for element in bundle:
>>>>>>         process(element)
>>>>>> where the traditional setup/start_bundle/finish_bundle/teardown logic
>>>>>> would live in the __enter__ and __exit__ methods (made even easier with
>>>>>> coroutines.) For convenience one could of course provide a raw bundle
>>>>>> processor or element processor to ParDo if the enter/exit contexts are
>>>>>> trivial. But this is getting somewhat off-topic...
>>>>>>>> Best,
>>>>>>>> B
>>>>>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <>
>>>>>>>> wrote:
>>>>>>>>> (I notice that you replied only to yourself, but there has been a
>>>>>>>>> whole thread of discussion on this - are you subscribed to dev@beam?
>>>>>>>>> It sounds like you want what everyone wants: to have the biggest
>>>>>>>>> bundles possible.
>>>>>>>>> So for bounded data, basically you make even splits of the data
>>>>>>>>> and each split is one bundle. And then dynamic splitting to 
>>>>>>>>> redistribute
>>>>>>>>> work to eliminate stragglers, if your engine has that capability.
>>>>>>>>> For unbounded data, you more-or-less bundle as much as you can
>>>>>>>>> without waiting too long, like Jan described.
>>>>>>>>> Users know to put their high fixed costs in @StartBundle and then
>>>>>>>>> it is the runner's job to put as many calls to @ProcessElement as 
>>>>>>>>> possible
>>>>>>>>> to amortize.
>>>>>>>>> Kenn
>>>>>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <
>>>>>>>>>> wrote:
>>>>>>>>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>>>>>>>>> greatest strategy for high *fixed* cost transforms", e.g. a
>>>>>>>>>> transform that takes 5 minutes to get set up and then maybe a 
>>>>>>>>>> microsecond
>>>>>>>>>> per input
>>>>>>>>>> I suppose one solution is to move the responsibility for handling
>>>>>>>>>> this kind of situation to the user and expect users to use a bundling
>>>>>>>>>> transform (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. 
>>>>>>>>>> Is this
>>>>>>>>>> what other runners expect? Just want to make sure I'm not missing 
>>>>>>>>>> some
>>>>>>>>>> smart generic bundling strategy that might handle this for users.
>>>>>>>>>> [1]
>>>>>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Writing a runner and the first strategy for determining bundling
>>>>>>>>>>> size was to just start with a bundle size of one and double it 
>>>>>>>>>>> until we
>>>>>>>>>>> reach a size that we expect to take some targets per-bundle runtime 
>>>>>>>>>>> (e.g.
>>>>>>>>>>> maybe 10 minutes). I realize that this isn't the greatest strategy 
>>>>>>>>>>> for high
>>>>>>>>>>> sized cost transforms. I'm curious what kind of strategies other 
>>>>>>>>>>> runners
>>>>>>>>>>> take?

Reply via email to