These are some good points. Replies inline.

On Mon, Sep 25, 2023 at 9:19 AM Jan Lukavský <je...@seznam.cz> wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still in flight until after finish bundle. Therefore
> simply caching the records should always be watermark safe, regardless of
> the runner. You will only run into problems if you try and move timestamps
> "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way.
>


> Bundles are implemented as a somewhat arbitrary set of elements between
> two consecutive checkpoints (there can be multiple bundles between
> checkpoints)
>

Yes, it is a good point. To align the runner: an input element is not
processed until it has been through @ProcessElement and then
also @FinishBundle called. Until that happens, the input element is still
"in process" and would hold the watermark. This doesn't mean the watermark
is frozen; it only means it is constrained.


> A bundle is 'committed' (i.e. persistently stored and guaranteed not to
> retry) only after the checkpoint barrier passes over the elements in the
> bundle (every bundle is finished at the very latest exactly before a
> checkpoint).
>

I think this is fine and does not have to be related to bundle processing
or watermarks. Since Flink does global consistency, any downstream work
that depended on the not-persisted results would also be reset back to the
checkpoint so it is fine.


> But watermark propagation and bundle finalization is completely unrelated.
> This might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

I am aware of this conflict. Interestingly, "requires stable input" is the
case where you must wait until checkpoint finalization, since inputs may
spontaneously change on retry before a checkpoint is finalized. This is not
just a mismatch in Beam/Flink but I believe Flink itself cannot correctly
process this kind of data without waiting for checkpoint finalization.

Kenn

>
> Reuven
>
> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>> There was a thread [1], where the conclusion seemed to be that updating
>> watermark is possible even in the middle of a bundle. Actually, handling
>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>> checkpoints, they are always recomputed from scratch on restore).
>>
>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>
>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <dev@beam.apache.org>
>>> wrote:
>>>
>>>> I've actually wondered about this specifically for streaming... if
>>>> you're writing a pipeline there it seems like you're often going to want to
>>>> put high fixed cost things like database connections even outside of the
>>>> bundle setup. You really only want to do that once in the lifetime of the
>>>> worker itself, not the bundle. Seems like having that boundary be somewhere
>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>> latency) group of elements might be more useful? I suppose this depends
>>>> heavily on the object lifecycle in the sdk worker though.
>>>>
>>>
>>> +1. This is the difference between @Setup and @StartBundle. The
>>> start/finish bundle operations should be used for bracketing element
>>> processing that must be committed as a unit for correct failure recovery
>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>> in FinishBundle). On the other hand, things like open database connections
>>> can and likely should be shared across bundles.
>>>
>>> This is correct, but the caching between @StartBundle and @FinishBundle
>>> has some problems. First, users need to manually set watermark hold for
>>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>>> elements.
>>>
>>
>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>>
>>> Users don't have other option than using timer.withOutputTimestamp for
>>> that, as we don't have a user-facing API to set watermark hold otherwise,
>>> thus the in-bundle caching implies stateful DoFn. The question might then
>>> by, why not use "classical" stateful caching involving state, as there is
>>> full control over the caching in user code. This triggered me an idea if it
>>> would be useful to add the information about caching to the API (e.g. in
>>> Java @StartBundle(caching=true)), which could solve the above issues maybe
>>> (runner would know to set the hold, it could work with "stateless" DoFns)?
>>>
>>
>> Really, this is one of the areas that the streaming/batch abstraction
>> leaks. In batch it was a common pattern to have local DoFn instance state
>> that persisted from start to finish bundle, and these were also used as
>> convenient entry points for other operations (like opening
>> database connections) 'cause bundles were often "as large as possible."
>> WIth the advent of n streaming it makes sense to put this in
>> explicitly managed runner state to allow for cross-bundle amortization and
>> there's more value in distinguishing between @Setup and @StartBundle.
>>
>> (Were I do to things over I'd probably encourage an API that discouraged
>> non-configuration instance state on DoFns altogether, e.g. in the notion of
>> Python context managers (and an equivalent API could probably be put
>> together with AutoClosables in Java) one would have something like
>>
>> ParDo(X)
>>
>> which would logically (though not necessarily physically) lead to an
>> execution like
>>
>> with X.bundle_processor() as bundle_processor:
>>   for bundle in bundles:
>>     with bundle_processor.element_processor() as process:
>>       for element in bundle:
>>         process(element)
>>
>> where the traditional setup/start_bundle/finish_bundle/teardown logic
>> would live in the __enter__ and __exit__ methods (made even easier with
>> coroutines.) For convenience one could of course provide a raw bundle
>> processor or element processor to ParDo if the enter/exit contexts are
>> trivial. But this is getting somewhat off-topic...
>>
>>
>>>
>>>>
>>>> Best,
>>>> B
>>>>
>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> (I notice that you replied only to yourself, but there has been a
>>>>> whole thread of discussion on this - are you subscribed to dev@beam?
>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>>>>
>>>>> It sounds like you want what everyone wants: to have the biggest
>>>>> bundles possible.
>>>>>
>>>>> So for bounded data, basically you make even splits of the data and
>>>>> each split is one bundle. And then dynamic splitting to redistribute work
>>>>> to eliminate stragglers, if your engine has that capability.
>>>>>
>>>>> For unbounded data, you more-or-less bundle as much as you can without
>>>>> waiting too long, like Jan described.
>>>>>
>>>>> Users know to put their high fixed costs in @StartBundle and then it
>>>>> is the runner's job to put as many calls to @ProcessElement as possible to
>>>>> amortize.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>>>>> greatest strategy for high *fixed* cost transforms", e.g. a
>>>>>> transform that takes 5 minutes to get set up and then maybe a microsecond
>>>>>> per input
>>>>>>
>>>>>> I suppose one solution is to move the responsibility for handling
>>>>>> this kind of situation to the user and expect users to use a bundling
>>>>>> transform (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is 
>>>>>> this
>>>>>> what other runners expect? Just want to make sure I'm not missing some
>>>>>> smart generic bundling strategy that might handle this for users.
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Writing a runner and the first strategy for determining bundling
>>>>>>> size was to just start with a bundle size of one and double it until we
>>>>>>> reach a size that we expect to take some targets per-bundle runtime 
>>>>>>> (e.g.
>>>>>>> maybe 10 minutes). I realize that this isn't the greatest strategy for 
>>>>>>> high
>>>>>>> sized cost transforms. I'm curious what kind of strategies other runners
>>>>>>> take?
>>>>>>>
>>>>>>

Reply via email to