Using Setup would cause data loss in this case. A runner can always retry a
bundle, and I don't believe Setup is called again in this case. If the user
initiated the hashmap in setup, this would cause records to be completely
lost whenever bundles retry.

On Wed, Sep 27, 2023 at 11:20 AM Jan Lukavský <je...@seznam.cz> wrote:

> What is the reason to rely on StartBundle and not Setup in this case? If
> the life-cycle of bundle is not "closed" (i.e. start - finish), then it
> seems to be ill defined and Setup should do?
> I'm trying to think of non-caching use-cases of StartBundle-FinishBundle,
> are there such cases? I'd say yes, but I'm a little struggling finding a
> specific example that cannot be solved using Setup or lazy init.
> On 9/27/23 19:58, Reuven Lax via dev wrote:
>
> DoFns are allowed to be non deterministic, so they don't have to yield the
> "same" output.
>
> The example I'm thinking of is where users perform some "best-effort"
> deduplication by creating a hashmap in StartBundle and removing duplicates.
> This is usually done purely for performance to reduce shuffle size, as
> opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
> FinishBundle, though it does require a StartBundle.
>
> On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>>
>> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev <dev@beam.apache.org>
>> wrote:
>>
>>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>>> Though absence of FinishBundle doesn't mean that one can assume that
>>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>>> might still be caching!)
>>>
>>
>> Well for a DoFn to be correct, it has to yield the same (or "the same as
>> much as the user expects it to be the same") output regardless of order of
>> processing or bundling so a runner or SDK harness can definitely take a
>> bunch of elements and process them however it wants if there's
>> no @FinishBundle. I think that's what Jan is getting at - adding
>> a @FinishBundle is the user placing a new restriction on the runner.
>> Technically probably have to include @StartBundle in that consideration.
>>
>> Kenn
>>
>>
>>>
>>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Kenn and Reuven,
>>>>>
>>>>> I agree with all these points. The only issue here seems to be that
>>>>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>>>>> fixed, though we need to change some defaults, as 1000 ms default bundle
>>>>> "duration" for lower traffic Pipelines can be too much. We are also
>>>>> probably missing some @ValidatesReunner tests for this. I created [1] and
>>>>> [2] to track this.
>>>>>
>>>>> One question still remains, the bundle vs. element life-cycle is
>>>>> relevant only for cases where processing of element X can affect 
>>>>> processing
>>>>> of element Y later in the same bundle. Once this influence is rules out
>>>>> (i.e. no caching), this information can result in runner optimization that
>>>>> yields better performance. Should we consider propagate this information
>>>>> from user code to the runner?
>>>>>
>>>> Yes!
>>>>
>>>> This was the explicit goal of the move to annotation-driven DoFn in
>>>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
>>>> can get good information about what the DoFn requirements are.
>>>>
>>>> When there is no @FinishBundle method, the runner can make additional
>>>> optimizations. This should have been included in the ParDoPayload in the
>>>> proto when we moved to portable pipelines. I cannot remember if there was a
>>>> good reason that we did not do so. Maybe we (incorrectly) thought that this
>>>> was an issue that only the Java SDK harness needed to know about.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>> [1] https://github.com/apache/beam/issues/28649
>>>>>
>>>>> [2] https://github.com/apache/beam/issues/28650
>>>>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>>
>>>>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>>>>
>>>>>> Two separate things here:
>>>>>>
>>>>>> 1. Yes, a watermark can update in the middle of a bundle.
>>>>>> 2. The records in the bundle themselves will prevent the watermark
>>>>>> from updating as they are still in flight until after finish bundle.
>>>>>> Therefore simply caching the records should always be watermark safe,
>>>>>> regardless of the runner. You will only run into problems if you try and
>>>>>> move timestamps "backwards" - which is why Beam strongly discourages 
>>>>>> this.
>>>>>>
>>>>>> This is not aligned with  FlinkRunner's implementation. And I
>>>>>> actually think it is not aligned conceptually.  As mentioned, Flink does
>>>>>> not have the concept of bundles at all. It achieves fault tolerance via
>>>>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>>>>> sinks, safely snapshotting state of each operator on the way. Bundles are
>>>>>> implemented as a somewhat arbitrary set of elements between two 
>>>>>> consecutive
>>>>>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>>>>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) 
>>>>>> only
>>>>>> after the checkpoint barrier passes over the elements in the bundle 
>>>>>> (every
>>>>>> bundle is finished at the very latest exactly before a checkpoint). But
>>>>>> watermark propagation and bundle finalization is completely unrelated. 
>>>>>> This
>>>>>> might be a bug in the runner, but requiring checkpoint for watermark
>>>>>> propagation will introduce insane delays between processing time and
>>>>>> watermarks, every executable stage will delay watermark propagation 
>>>>>> until a
>>>>>> checkpoint (which is typically the order of seconds). This delay would 
>>>>>> add
>>>>>> up after each stage.
>>>>>>
>>>>>
>>>>> It's not bundles that hold up processing, rather it is elements, and
>>>>> elements are not considered "processed" until FinishBundle.
>>>>>
>>>>> You are right about Flink. In many cases this is fine - if Flink rolls
>>>>> back to the last checkpoint, the watermark will also roll back, and
>>>>> everything stays consistent. So in general, one does not need to wait for
>>>>> checkpoints for watermark propagation.
>>>>>
>>>>> Where things get a bit weirder with Flink is whenever one has external
>>>>> side effects. In theory, one should wait for checkpoints before letting a
>>>>> Sink flush, otherwise one could end up with incorrect outputs (especially
>>>>> with a sink like TextIO). Flink itself recognizes this, and that's why 
>>>>> they
>>>>> provide TwoPhaseCommitSinkFunction
>>>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
>>>>>  which
>>>>> waits for a checkpoint. In Beam, this is the reason we introduced
>>>>> RequiresStableInput. Of course in practice many Flink users don't do this 
>>>>> -
>>>>> in which case they are prioritizing latency over data correctness.
>>>>>
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>>
>>>>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>>>
>>>>>>> There was a thread [1], where the conclusion seemed to be that
>>>>>>> updating watermark is possible even in the middle of a bundle. Actually,
>>>>>>> handling watermarks is runner-dependent (e.g. Flink does not store
>>>>>>> watermarks in checkpoints, they are always recomputed from scratch on
>>>>>>> restore).
>>>>>>>
>>>>>>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>>>>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>>>>>>
>>>>>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>>>>>>
>>>>>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <
>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> I've actually wondered about this specifically for streaming... if
>>>>>>>>> you're writing a pipeline there it seems like you're often going to 
>>>>>>>>> want to
>>>>>>>>> put high fixed cost things like database connections even outside of 
>>>>>>>>> the
>>>>>>>>> bundle setup. You really only want to do that once in the lifetime of 
>>>>>>>>> the
>>>>>>>>> worker itself, not the bundle. Seems like having that boundary be 
>>>>>>>>> somewhere
>>>>>>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>>>>>>> latency) group of elements might be more useful? I suppose this 
>>>>>>>>> depends
>>>>>>>>> heavily on the object lifecycle in the sdk worker though.
>>>>>>>>>
>>>>>>>>
>>>>>>>> +1. This is the difference between @Setup and @StartBundle. The
>>>>>>>> start/finish bundle operations should be used for bracketing element
>>>>>>>> processing that must be committed as a unit for correct failure 
>>>>>>>> recovery
>>>>>>>> (e.g. if elements are cached in ProcessElement, they should all be 
>>>>>>>> emitted
>>>>>>>> in FinishBundle). On the other hand, things like open database 
>>>>>>>> connections
>>>>>>>> can and likely should be shared across bundles.
>>>>>>>>
>>>>>>>> This is correct, but the caching between @StartBundle and
>>>>>>>> @FinishBundle has some problems. First, users need to manually set
>>>>>>>> watermark hold for min(timestamp in bundle), otherwise watermark might
>>>>>>>> overtake the buffered elements.
>>>>>>>>
>>>>>>>
>>>>>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>>>
>>>>>>>
>>>>>>>> Users don't have other option than using timer.withOutputTimestamp
>>>>>>>> for that, as we don't have a user-facing API to set watermark hold
>>>>>>>> otherwise, thus the in-bundle caching implies stateful DoFn. The 
>>>>>>>> question
>>>>>>>> might then by, why not use "classical" stateful caching involving 
>>>>>>>> state, as
>>>>>>>> there is full control over the caching in user code. This triggered me 
>>>>>>>> an
>>>>>>>> idea if it would be useful to add the information about caching to the 
>>>>>>>> API
>>>>>>>> (e.g. in Java @StartBundle(caching=true)), which could solve the above
>>>>>>>> issues maybe (runner would know to set the hold, it could work with
>>>>>>>> "stateless" DoFns)?
>>>>>>>>
>>>>>>>
>>>>>>> Really, this is one of the areas that the streaming/batch
>>>>>>> abstraction leaks. In batch it was a common pattern to have local DoFn
>>>>>>> instance state that persisted from start to finish bundle, and these 
>>>>>>> were
>>>>>>> also used as convenient entry points for other operations (like opening
>>>>>>> database connections) 'cause bundles were often "as large as possible."
>>>>>>> WIth the advent of n streaming it makes sense to put this in
>>>>>>> explicitly managed runner state to allow for cross-bundle amortization 
>>>>>>> and
>>>>>>> there's more value in distinguishing between @Setup and @
>>>>>>> StartBundle.
>>>>>>>
>>>>>>> (Were I do to things over I'd probably encourage an API that
>>>>>>> discouraged non-configuration instance state on DoFns altogether, e.g. 
>>>>>>> in
>>>>>>> the notion of Python context managers (and an equivalent API could 
>>>>>>> probably
>>>>>>> be put together with AutoClosables in Java) one would have something 
>>>>>>> like
>>>>>>>
>>>>>>> ParDo(X)
>>>>>>>
>>>>>>> which would logically (though not necessarily physically) lead to an
>>>>>>> execution like
>>>>>>>
>>>>>>> with X.bundle_processor() as bundle_processor:
>>>>>>>   for bundle in bundles:
>>>>>>>     with bundle_processor.element_processor() as process:
>>>>>>>       for element in bundle:
>>>>>>>         process(element)
>>>>>>>
>>>>>>> where the traditional setup/start_bundle/finish_bundle/teardown
>>>>>>> logic would live in the __enter__ and __exit__ methods (made even easier
>>>>>>> with coroutines.) For convenience one could of course provide a raw 
>>>>>>> bundle
>>>>>>> processor or element processor to ParDo if the enter/exit contexts are
>>>>>>> trivial. But this is getting somewhat off-topic...
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> B
>>>>>>>>>
>>>>>>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> (I notice that you replied only to yourself, but there has been a
>>>>>>>>>> whole thread of discussion on this - are you subscribed to dev@beam?
>>>>>>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>>>>>>>>>
>>>>>>>>>> It sounds like you want what everyone wants: to have the biggest
>>>>>>>>>> bundles possible.
>>>>>>>>>>
>>>>>>>>>> So for bounded data, basically you make even splits of the data
>>>>>>>>>> and each split is one bundle. And then dynamic splitting to 
>>>>>>>>>> redistribute
>>>>>>>>>> work to eliminate stragglers, if your engine has that capability.
>>>>>>>>>>
>>>>>>>>>> For unbounded data, you more-or-less bundle as much as you can
>>>>>>>>>> without waiting too long, like Jan described.
>>>>>>>>>>
>>>>>>>>>> Users know to put their high fixed costs in @StartBundle and then
>>>>>>>>>> it is the runner's job to put as many calls to @ProcessElement as 
>>>>>>>>>> possible
>>>>>>>>>> to amortize.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <
>>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>>>>>>>>>> greatest strategy for high *fixed* cost transforms", e.g. a
>>>>>>>>>>> transform that takes 5 minutes to get set up and then maybe a 
>>>>>>>>>>> microsecond
>>>>>>>>>>> per input
>>>>>>>>>>>
>>>>>>>>>>> I suppose one solution is to move the responsibility for
>>>>>>>>>>> handling this kind of situation to the user and expect users to use 
>>>>>>>>>>> a
>>>>>>>>>>> bundling transform (e.g. BatchElements [1]) followed by a
>>>>>>>>>>> Reshuffle+FlatMap. Is this what other runners expect? Just want to 
>>>>>>>>>>> make
>>>>>>>>>>> sure I'm not missing some smart generic bundling strategy that 
>>>>>>>>>>> might handle
>>>>>>>>>>> this for users.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <
>>>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Writing a runner and the first strategy for determining
>>>>>>>>>>>> bundling size was to just start with a bundle size of one and 
>>>>>>>>>>>> double it
>>>>>>>>>>>> until we reach a size that we expect to take some targets 
>>>>>>>>>>>> per-bundle
>>>>>>>>>>>> runtime (e.g. maybe 10 minutes). I realize that this isn't the 
>>>>>>>>>>>> greatest
>>>>>>>>>>>> strategy for high sized cost transforms. I'm curious what kind of
>>>>>>>>>>>> strategies other runners take?
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to