Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark from
updating as they are still in flight until after finish bundle. Therefore
simply caching the records should always be watermark safe, regardless of
the runner. You will only run into problems if you try and move timestamps
"backwards" - which is why Beam strongly discourages this.

Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:

> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
> committed, as there's no guarantee that this work won't be discarded.
>
> There was a thread [1], where the conclusion seemed to be that updating
> watermark is possible even in the middle of a bundle. Actually, handling
> watermarks is runner-dependent (e.g. Flink does not store watermarks in
> checkpoints, they are always recomputed from scratch on restore).
>
> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>
> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <dev@beam.apache.org>
>> wrote:
>>
>>> I've actually wondered about this specifically for streaming... if
>>> you're writing a pipeline there it seems like you're often going to want to
>>> put high fixed cost things like database connections even outside of the
>>> bundle setup. You really only want to do that once in the lifetime of the
>>> worker itself, not the bundle. Seems like having that boundary be somewhere
>>> other than an arbitrarily (and probably small in streaming to avoid
>>> latency) group of elements might be more useful? I suppose this depends
>>> heavily on the object lifecycle in the sdk worker though.
>>>
>>
>> +1. This is the difference between @Setup and @StartBundle. The
>> start/finish bundle operations should be used for bracketing element
>> processing that must be committed as a unit for correct failure recovery
>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>> in FinishBundle). On the other hand, things like open database connections
>> can and likely should be shared across bundles.
>>
>> This is correct, but the caching between @StartBundle and @FinishBundle
>> has some problems. First, users need to manually set watermark hold for
>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>> elements.
>>
>
> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
> committed, as there's no guarantee that this work won't be discarded.
>
>
>> Users don't have other option than using timer.withOutputTimestamp for
>> that, as we don't have a user-facing API to set watermark hold otherwise,
>> thus the in-bundle caching implies stateful DoFn. The question might then
>> by, why not use "classical" stateful caching involving state, as there is
>> full control over the caching in user code. This triggered me an idea if it
>> would be useful to add the information about caching to the API (e.g. in
>> Java @StartBundle(caching=true)), which could solve the above issues maybe
>> (runner would know to set the hold, it could work with "stateless" DoFns)?
>>
>
> Really, this is one of the areas that the streaming/batch abstraction
> leaks. In batch it was a common pattern to have local DoFn instance state
> that persisted from start to finish bundle, and these were also used as
> convenient entry points for other operations (like opening
> database connections) 'cause bundles were often "as large as possible."
> WIth the advent of n streaming it makes sense to put this in
> explicitly managed runner state to allow for cross-bundle amortization and
> there's more value in distinguishing between @Setup and @StartBundle.
>
> (Were I do to things over I'd probably encourage an API that discouraged
> non-configuration instance state on DoFns altogether, e.g. in the notion of
> Python context managers (and an equivalent API could probably be put
> together with AutoClosables in Java) one would have something like
>
> ParDo(X)
>
> which would logically (though not necessarily physically) lead to an
> execution like
>
> with X.bundle_processor() as bundle_processor:
>   for bundle in bundles:
>     with bundle_processor.element_processor() as process:
>       for element in bundle:
>         process(element)
>
> where the traditional setup/start_bundle/finish_bundle/teardown logic
> would live in the __enter__ and __exit__ methods (made even easier with
> coroutines.) For convenience one could of course provide a raw bundle
> processor or element processor to ParDo if the enter/exit contexts are
> trivial. But this is getting somewhat off-topic...
>
>
>>
>>>
>>> Best,
>>> B
>>>
>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> (I notice that you replied only to yourself, but there has been a whole
>>>> thread of discussion on this - are you subscribed to dev@beam?
>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>>>
>>>> It sounds like you want what everyone wants: to have the biggest
>>>> bundles possible.
>>>>
>>>> So for bounded data, basically you make even splits of the data and
>>>> each split is one bundle. And then dynamic splitting to redistribute work
>>>> to eliminate stragglers, if your engine has that capability.
>>>>
>>>> For unbounded data, you more-or-less bundle as much as you can without
>>>> waiting too long, like Jan described.
>>>>
>>>> Users know to put their high fixed costs in @StartBundle and then it is
>>>> the runner's job to put as many calls to @ProcessElement as possible to
>>>> amortize.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <joey.t...@schrodinger.com>
>>>> wrote:
>>>>
>>>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>>>> greatest strategy for high *fixed* cost transforms", e.g. a transform
>>>>> that takes 5 minutes to get set up and then maybe a microsecond per input
>>>>>
>>>>> I suppose one solution is to move the responsibility for handling this
>>>>> kind of situation to the user and expect users to use a bundling transform
>>>>> (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is this what
>>>>> other runners expect? Just want to make sure I'm not missing some smart
>>>>> generic bundling strategy that might handle this for users.
>>>>>
>>>>> [1]
>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>>>
>>>>>
>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Writing a runner and the first strategy for determining bundling size
>>>>>> was to just start with a bundle size of one and double it until we reach 
>>>>>> a
>>>>>> size that we expect to take some targets per-bundle runtime (e.g. maybe 
>>>>>> 10
>>>>>> minutes). I realize that this isn't the greatest strategy for high sized
>>>>>> cost transforms. I'm curious what kind of strategies other runners take?
>>>>>>
>>>>>

Reply via email to