These are some good points. Replies inline. On Mon, Sep 25, 2023 at 9:19 AM Jan Lukavský <je...@seznam.cz> wrote:
> > On 9/23/23 18:16, Reuven Lax via dev wrote: > > Two separate things here: > > 1. Yes, a watermark can update in the middle of a bundle. > 2. The records in the bundle themselves will prevent the watermark from > updating as they are still in flight until after finish bundle. Therefore > simply caching the records should always be watermark safe, regardless of > the runner. You will only run into problems if you try and move timestamps > "backwards" - which is why Beam strongly discourages this. > > This is not aligned with FlinkRunner's implementation. And I actually > think it is not aligned conceptually. As mentioned, Flink does not have > the concept of bundles at all. It achieves fault tolerance via > checkpointing, essentially checkpoint barrier flowing from sources to > sinks, safely snapshotting state of each operator on the way. > > Bundles are implemented as a somewhat arbitrary set of elements between > two consecutive checkpoints (there can be multiple bundles between > checkpoints) > Yes, it is a good point. To align the runner: an input element is not processed until it has been through @ProcessElement and then also @FinishBundle called. Until that happens, the input element is still "in process" and would hold the watermark. This doesn't mean the watermark is frozen; it only means it is constrained. > A bundle is 'committed' (i.e. persistently stored and guaranteed not to > retry) only after the checkpoint barrier passes over the elements in the > bundle (every bundle is finished at the very latest exactly before a > checkpoint). > I think this is fine and does not have to be related to bundle processing or watermarks. Since Flink does global consistency, any downstream work that depended on the not-persisted results would also be reset back to the checkpoint so it is fine. > But watermark propagation and bundle finalization is completely unrelated. > This might be a bug in the runner, but requiring checkpoint for watermark > propagation will introduce insane delays between processing time and > watermarks, every executable stage will delay watermark propagation until a > checkpoint (which is typically the order of seconds). This delay would add > up after each stage. > I am aware of this conflict. Interestingly, "requires stable input" is the case where you must wait until checkpoint finalization, since inputs may spontaneously change on retry before a checkpoint is finalized. This is not just a mismatch in Beam/Flink but I believe Flink itself cannot correctly process this kind of data without waiting for checkpoint finalization. Kenn > > Reuven > > On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote: > >> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is >> committed, as there's no guarantee that this work won't be discarded. >> >> There was a thread [1], where the conclusion seemed to be that updating >> watermark is possible even in the middle of a bundle. Actually, handling >> watermarks is runner-dependent (e.g. Flink does not store watermarks in >> checkpoints, they are always recomputed from scratch on restore). >> >> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv >> On 9/22/23 21:47, Robert Bradshaw via dev wrote: >> >> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> On 9/22/23 18:07, Robert Bradshaw via dev wrote: >>> >>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <dev@beam.apache.org> >>> wrote: >>> >>>> I've actually wondered about this specifically for streaming... if >>>> you're writing a pipeline there it seems like you're often going to want to >>>> put high fixed cost things like database connections even outside of the >>>> bundle setup. You really only want to do that once in the lifetime of the >>>> worker itself, not the bundle. Seems like having that boundary be somewhere >>>> other than an arbitrarily (and probably small in streaming to avoid >>>> latency) group of elements might be more useful? I suppose this depends >>>> heavily on the object lifecycle in the sdk worker though. >>>> >>> >>> +1. This is the difference between @Setup and @StartBundle. The >>> start/finish bundle operations should be used for bracketing element >>> processing that must be committed as a unit for correct failure recovery >>> (e.g. if elements are cached in ProcessElement, they should all be emitted >>> in FinishBundle). On the other hand, things like open database connections >>> can and likely should be shared across bundles. >>> >>> This is correct, but the caching between @StartBundle and @FinishBundle >>> has some problems. First, users need to manually set watermark hold for >>> min(timestamp in bundle), otherwise watermark might overtake the buffered >>> elements. >>> >> >> Watermarks shouldn't be (visibly) advanced until @FinishBundle is >> committed, as there's no guarantee that this work won't be discarded. >> >> >>> Users don't have other option than using timer.withOutputTimestamp for >>> that, as we don't have a user-facing API to set watermark hold otherwise, >>> thus the in-bundle caching implies stateful DoFn. The question might then >>> by, why not use "classical" stateful caching involving state, as there is >>> full control over the caching in user code. This triggered me an idea if it >>> would be useful to add the information about caching to the API (e.g. in >>> Java @StartBundle(caching=true)), which could solve the above issues maybe >>> (runner would know to set the hold, it could work with "stateless" DoFns)? >>> >> >> Really, this is one of the areas that the streaming/batch abstraction >> leaks. In batch it was a common pattern to have local DoFn instance state >> that persisted from start to finish bundle, and these were also used as >> convenient entry points for other operations (like opening >> database connections) 'cause bundles were often "as large as possible." >> WIth the advent of n streaming it makes sense to put this in >> explicitly managed runner state to allow for cross-bundle amortization and >> there's more value in distinguishing between @Setup and @StartBundle. >> >> (Were I do to things over I'd probably encourage an API that discouraged >> non-configuration instance state on DoFns altogether, e.g. in the notion of >> Python context managers (and an equivalent API could probably be put >> together with AutoClosables in Java) one would have something like >> >> ParDo(X) >> >> which would logically (though not necessarily physically) lead to an >> execution like >> >> with X.bundle_processor() as bundle_processor: >> for bundle in bundles: >> with bundle_processor.element_processor() as process: >> for element in bundle: >> process(element) >> >> where the traditional setup/start_bundle/finish_bundle/teardown logic >> would live in the __enter__ and __exit__ methods (made even easier with >> coroutines.) For convenience one could of course provide a raw bundle >> processor or element processor to ParDo if the enter/exit contexts are >> trivial. But this is getting somewhat off-topic... >> >> >>> >>>> >>>> Best, >>>> B >>>> >>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> (I notice that you replied only to yourself, but there has been a >>>>> whole thread of discussion on this - are you subscribed to dev@beam? >>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd) >>>>> >>>>> It sounds like you want what everyone wants: to have the biggest >>>>> bundles possible. >>>>> >>>>> So for bounded data, basically you make even splits of the data and >>>>> each split is one bundle. And then dynamic splitting to redistribute work >>>>> to eliminate stragglers, if your engine has that capability. >>>>> >>>>> For unbounded data, you more-or-less bundle as much as you can without >>>>> waiting too long, like Jan described. >>>>> >>>>> Users know to put their high fixed costs in @StartBundle and then it >>>>> is the runner's job to put as many calls to @ProcessElement as possible to >>>>> amortize. >>>>> >>>>> Kenn >>>>> >>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Whoops, I typoed my last email. I meant to write "this isn't the >>>>>> greatest strategy for high *fixed* cost transforms", e.g. a >>>>>> transform that takes 5 minutes to get set up and then maybe a microsecond >>>>>> per input >>>>>> >>>>>> I suppose one solution is to move the responsibility for handling >>>>>> this kind of situation to the user and expect users to use a bundling >>>>>> transform (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is >>>>>> this >>>>>> what other runners expect? Just want to make sure I'm not missing some >>>>>> smart generic bundling strategy that might handle this for users. >>>>>> >>>>>> [1] >>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements >>>>>> >>>>>> >>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <joey.t...@schrodinger.com> >>>>>> wrote: >>>>>> >>>>>>> Writing a runner and the first strategy for determining bundling >>>>>>> size was to just start with a bundle size of one and double it until we >>>>>>> reach a size that we expect to take some targets per-bundle runtime >>>>>>> (e.g. >>>>>>> maybe 10 minutes). I realize that this isn't the greatest strategy for >>>>>>> high >>>>>>> sized cost transforms. I'm curious what kind of strategies other runners >>>>>>> take? >>>>>>> >>>>>>