Yes, not including FinishBundle in ParDoPayload seems like a mistake.
Though absence of FinishBundle doesn't mean that one can assume that
elements in a bundle don't affect subsequent bundle elements (i.e. there
might still be caching!)

On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles <k...@apache.org> wrote:

>
>
> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Kenn and Reuven,
>>
>> I agree with all these points. The only issue here seems to be that
>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>> fixed, though we need to change some defaults, as 1000 ms default bundle
>> "duration" for lower traffic Pipelines can be too much. We are also
>> probably missing some @ValidatesReunner tests for this. I created [1] and
>> [2] to track this.
>>
>> One question still remains, the bundle vs. element life-cycle is relevant
>> only for cases where processing of element X can affect processing of
>> element Y later in the same bundle. Once this influence is rules out (i.e.
>> no caching), this information can result in runner optimization that yields
>> better performance. Should we consider propagate this information from user
>> code to the runner?
>>
> Yes!
>
> This was the explicit goal of the move to annotation-driven DoFn in
> https://s.apache.org/a-new-dofn to make it so that the SDK and runner can
> get good information about what the DoFn requirements are.
>
> When there is no @FinishBundle method, the runner can make additional
> optimizations. This should have been included in the ParDoPayload in the
> proto when we moved to portable pipelines. I cannot remember if there was a
> good reason that we did not do so. Maybe we (incorrectly) thought that this
> was an issue that only the Java SDK harness needed to know about.
>
> Kenn
>
>
>> [1] https://github.com/apache/beam/issues/28649
>>
>> [2] https://github.com/apache/beam/issues/28650
>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>
>>
>>
>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>>
>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>
>>> Two separate things here:
>>>
>>> 1. Yes, a watermark can update in the middle of a bundle.
>>> 2. The records in the bundle themselves will prevent the watermark from
>>> updating as they are still in flight until after finish bundle. Therefore
>>> simply caching the records should always be watermark safe, regardless of
>>> the runner. You will only run into problems if you try and move timestamps
>>> "backwards" - which is why Beam strongly discourages this.
>>>
>>> This is not aligned with  FlinkRunner's implementation. And I actually
>>> think it is not aligned conceptually.  As mentioned, Flink does not have
>>> the concept of bundles at all. It achieves fault tolerance via
>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>> sinks, safely snapshotting state of each operator on the way. Bundles are
>>> implemented as a somewhat arbitrary set of elements between two consecutive
>>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
>>> after the checkpoint barrier passes over the elements in the bundle (every
>>> bundle is finished at the very latest exactly before a checkpoint). But
>>> watermark propagation and bundle finalization is completely unrelated. This
>>> might be a bug in the runner, but requiring checkpoint for watermark
>>> propagation will introduce insane delays between processing time and
>>> watermarks, every executable stage will delay watermark propagation until a
>>> checkpoint (which is typically the order of seconds). This delay would add
>>> up after each stage.
>>>
>>
>> It's not bundles that hold up processing, rather it is elements, and
>> elements are not considered "processed" until FinishBundle.
>>
>> You are right about Flink. In many cases this is fine - if Flink rolls
>> back to the last checkpoint, the watermark will also roll back, and
>> everything stays consistent. So in general, one does not need to wait for
>> checkpoints for watermark propagation.
>>
>> Where things get a bit weirder with Flink is whenever one has external
>> side effects. In theory, one should wait for checkpoints before letting a
>> Sink flush, otherwise one could end up with incorrect outputs (especially
>> with a sink like TextIO). Flink itself recognizes this, and that's why they
>> provide TwoPhaseCommitSinkFunction
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
>>  which
>> waits for a checkpoint. In Beam, this is the reason we introduced
>> RequiresStableInput. Of course in practice many Flink users don't do this -
>> in which case they are prioritizing latency over data correctness.
>>
>>>
>>> Reuven
>>>
>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>
>>>> There was a thread [1], where the conclusion seemed to be that updating
>>>> watermark is possible even in the middle of a bundle. Actually, handling
>>>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>>>> checkpoints, they are always recomputed from scratch on restore).
>>>>
>>>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>>>
>>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>>>
>>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> I've actually wondered about this specifically for streaming... if
>>>>>> you're writing a pipeline there it seems like you're often going to want 
>>>>>> to
>>>>>> put high fixed cost things like database connections even outside of the
>>>>>> bundle setup. You really only want to do that once in the lifetime of the
>>>>>> worker itself, not the bundle. Seems like having that boundary be 
>>>>>> somewhere
>>>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>>>> latency) group of elements might be more useful? I suppose this depends
>>>>>> heavily on the object lifecycle in the sdk worker though.
>>>>>>
>>>>>
>>>>> +1. This is the difference between @Setup and @StartBundle. The
>>>>> start/finish bundle operations should be used for bracketing element
>>>>> processing that must be committed as a unit for correct failure recovery
>>>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>>>> in FinishBundle). On the other hand, things like open database connections
>>>>> can and likely should be shared across bundles.
>>>>>
>>>>> This is correct, but the caching between @StartBundle and
>>>>> @FinishBundle has some problems. First, users need to manually set
>>>>> watermark hold for min(timestamp in bundle), otherwise watermark might
>>>>> overtake the buffered elements.
>>>>>
>>>>
>>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>
>>>>
>>>>> Users don't have other option than using timer.withOutputTimestamp for
>>>>> that, as we don't have a user-facing API to set watermark hold otherwise,
>>>>> thus the in-bundle caching implies stateful DoFn. The question might then
>>>>> by, why not use "classical" stateful caching involving state, as there is
>>>>> full control over the caching in user code. This triggered me an idea if 
>>>>> it
>>>>> would be useful to add the information about caching to the API (e.g. in
>>>>> Java @StartBundle(caching=true)), which could solve the above issues maybe
>>>>> (runner would know to set the hold, it could work with "stateless" DoFns)?
>>>>>
>>>>
>>>> Really, this is one of the areas that the streaming/batch abstraction
>>>> leaks. In batch it was a common pattern to have local DoFn instance state
>>>> that persisted from start to finish bundle, and these were also used as
>>>> convenient entry points for other operations (like opening
>>>> database connections) 'cause bundles were often "as large as possible."
>>>> WIth the advent of n streaming it makes sense to put this in
>>>> explicitly managed runner state to allow for cross-bundle amortization and
>>>> there's more value in distinguishing between @Setup and @StartBundle.
>>>>
>>>> (Were I do to things over I'd probably encourage an API that
>>>> discouraged non-configuration instance state on DoFns altogether, e.g. in
>>>> the notion of Python context managers (and an equivalent API could probably
>>>> be put together with AutoClosables in Java) one would have something like
>>>>
>>>> ParDo(X)
>>>>
>>>> which would logically (though not necessarily physically) lead to an
>>>> execution like
>>>>
>>>> with X.bundle_processor() as bundle_processor:
>>>>   for bundle in bundles:
>>>>     with bundle_processor.element_processor() as process:
>>>>       for element in bundle:
>>>>         process(element)
>>>>
>>>> where the traditional setup/start_bundle/finish_bundle/teardown logic
>>>> would live in the __enter__ and __exit__ methods (made even easier with
>>>> coroutines.) For convenience one could of course provide a raw bundle
>>>> processor or element processor to ParDo if the enter/exit contexts are
>>>> trivial. But this is getting somewhat off-topic...
>>>>
>>>>
>>>>>
>>>>>>
>>>>>> Best,
>>>>>> B
>>>>>>
>>>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> (I notice that you replied only to yourself, but there has been a
>>>>>>> whole thread of discussion on this - are you subscribed to dev@beam?
>>>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>>>>>>
>>>>>>> It sounds like you want what everyone wants: to have the biggest
>>>>>>> bundles possible.
>>>>>>>
>>>>>>> So for bounded data, basically you make even splits of the data and
>>>>>>> each split is one bundle. And then dynamic splitting to redistribute 
>>>>>>> work
>>>>>>> to eliminate stragglers, if your engine has that capability.
>>>>>>>
>>>>>>> For unbounded data, you more-or-less bundle as much as you can
>>>>>>> without waiting too long, like Jan described.
>>>>>>>
>>>>>>> Users know to put their high fixed costs in @StartBundle and then it
>>>>>>> is the runner's job to put as many calls to @ProcessElement as possible 
>>>>>>> to
>>>>>>> amortize.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <joey.t...@schrodinger.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>>>>>>> greatest strategy for high *fixed* cost transforms", e.g. a
>>>>>>>> transform that takes 5 minutes to get set up and then maybe a 
>>>>>>>> microsecond
>>>>>>>> per input
>>>>>>>>
>>>>>>>> I suppose one solution is to move the responsibility for handling
>>>>>>>> this kind of situation to the user and expect users to use a bundling
>>>>>>>> transform (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is 
>>>>>>>> this
>>>>>>>> what other runners expect? Just want to make sure I'm not missing some
>>>>>>>> smart generic bundling strategy that might handle this for users.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <
>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>
>>>>>>>>> Writing a runner and the first strategy for determining bundling
>>>>>>>>> size was to just start with a bundle size of one and double it until 
>>>>>>>>> we
>>>>>>>>> reach a size that we expect to take some targets per-bundle runtime 
>>>>>>>>> (e.g.
>>>>>>>>> maybe 10 minutes). I realize that this isn't the greatest strategy 
>>>>>>>>> for high
>>>>>>>>> sized cost transforms. I'm curious what kind of strategies other 
>>>>>>>>> runners
>>>>>>>>> take?
>>>>>>>>>
>>>>>>>>

Reply via email to