Right. Generally speaking, merging of state would require some modifications to the model to support at least some hooks. +1

On 4/25/24 18:03, Reuven Lax via dev wrote:
I think there is more to it than that. You'll probably want retractions integrated into Beam's core triggering.

One example of where retractions are needed is with session windows. Early triggers are fairly broken with session windows because the windows themselves change as more data arrive. So an early trigger might generate data for two separate windows S1 and S2, but after more data arrives those two windows merge into a single S3. Retractions solve this by hooking into the window merging logic, retracting the outputs for S1 and S2 before outputting S3. I don't think this is possible today with a DSL.

On Thu, Apr 25, 2024 at 5:46 AM Jan Lukavský <je...@seznam.cz> wrote:

    > To implement retraction (at least to do so efficiently) I think
    you'll want it integrated in the model. e.g. for combiners one
    would want to add the option to subtract values, otherwise you
    would end up having to store every element defeating the
    performance that combiners provide.

    I think we use different words for the same. :) This is what I
    meant by "and define appropriate retraction functions to CombineFn
    and the like".

    On the other hand, you _could_ derive CombineFn with retraction
    capabilities, if you have a DSL that provides "retraction
    accumulation" function along with "addition accumulation", this
    should be possible to be combined into a CombineFn as we have it
    today. This should only require that the operation being combined
    is associative, commutative and have a valid unary operator '-'
    (which will be the result of the "retraction combine").

    On 4/23/24 18:08, Reuven Lax via dev wrote:


    On Tue, Apr 23, 2024 at 7:52 AM Jan Lukavský <je...@seznam.cz> wrote:

        On 4/22/24 20:40, Kenneth Knowles wrote:

        I'll go ahead and advertise
        https://s.apache.org/beam-sink-triggers again for this thread.
        +1

        There are a couple of difficult technical problems in there.
        One of them is backwards-propagating triggering to minimize
        extra latency. We can probably solve this as well as we
        solve forward-propagating without too much trouble. We also
        can/should leave it abstract so runners can implement either
        through back-propagating local triggering rules or using
        run-time communication to trigger upstream. These actually
        could interact well with stateful ParDo by sending a
        "trigger now please" message or some such.
        Yes, this was what I was referring to as the more
        "functional" style for stateful ParDo. At minimum, it
        requires adding new callback independent of @ProcessElement
        and @OnTimer -  @OnTrigger?

        But we also probably need retractions that automatically
        flow through the pipeline and update aggregations. Why?
        Because currently triggers don't just control update
        frequency but actually create new elements each time, so
        they require user fix-up logic to do the right thing with
        the output. When we go to higher levels of abstraction we
        need this to "just work" without changing the
        pipeline. There have been two (nearly identical) propotypes
        of adding retractions to the DirectRunner as proof of
        concept. But there's also work in all the IOs since they are
        not retraction-aware. Also lots of work in many library
        transforms where a retraction should be computed by running
        the transform "like normal" but then negating the result,
        but that cannot be the default for ParDo because it is
        deliberately more flexible, we just have to annotate Map and
        the like.
        +1. I think retractions could be implemented as DSL on top of
        the current model. Retractions can be viewed as regular data
        elements with additional metadata (upsert, delete). For ParDo
        we could add something like @RetractElement (and define
        appropriate retraction functions to CombineFn and the like).
        We could introduce RetractedPCollection or similar for this
        purpose.


    To implement retraction (at least to do so efficiently) I think
    you'll want it integrated in the model. e.g. for combiners one
    would want to add the option to subtract values, otherwise you
    would end up having to store every element defeating the
    performance that combiners provide.


        Getting all this right is a lot of work but would result in
        a system that is simpler to use out-of-the-box and a more
        robust SQL implementation (because you can't use triggers
        with SQL unless you have retractions or some other "just
        works" mode of computation). It would essentially change
        Beam into a delta-processing engine, which it arguably
        should be, with whole append-only elements being a simplest
        degenerate case of a delta (which would be highly optimized
        in batch/archival processing).
        +1

        Kenn

        On Tue, Apr 16, 2024 at 2:36 AM Reuven Lax via dev
        <dev@beam.apache.org> wrote:

            Yes, but that's inevitable as stateful ParDo in a sense
            live outside of most of the window/trigger semantics.
            Basically a stateful ParDo is the user executing
            low-level control over these semantics, and controlling
            output frequency themselves with timers. One could
            however still propagate the trigger upstream of the
            stateful ParDo, though I'm not sure if that's the best
            approach.

            On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský
            <je...@seznam.cz> wrote:

                On 4/11/24 18:20, Reuven Lax via dev wrote:
                I'm not sure it would require all that. A "basic"
                implementation could be done on top of our existing
                model. Essentially the user would specify triggers
                at the sink ParDos, then the runner would walk
                backwards up the graph, reverse-propagating these
                triggers (with some resolution rules aimed at
                keeping the minimum trigger latency). The runner
                could under the covers simply just apply the
                appropriate trigger into the Window, using the
                current mechanism. Of course building this all into
                the framework from scratch would be cleaner, but we
                could also build this on top of what we have.
                Any propagation from sink to source would be blocked
                by any stateful ParDo, because that does not adhere
                to the concept of trigger, no? Hence, we could get
                the required downstream 'cadence' of outputs, but
                these would change only when the upstream ParDo
                emits any data. Yes, one can argue that stateful
                ParDo is supposed to emit data at fast as possible,
                then this seems to work.

                On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský
                <je...@seznam.cz> wrote:

                    I've probably heard about it, but I never read
                    the proposal. Sounds great, but that would
                    require to change our ParDos from the
                    'directive' style to something more functional,
                    so that processing of elements, state updates
                    and outputting results can be decoupled and
                    managed by the runner independently. This goes
                    exactly in the direction of unifying GBK and
                    Combine with stateful ParDo. Sounds like
                    something worth exploring for Beam 3. :)

                    Anyway, thanks for this discussion, helped me
                    clarify some more white spots.

                     Jan

                    On 4/10/24 19:24, Reuven Lax via dev wrote:
                    Are you familiar with the "sink triggers"
                    proposal?

                    Essentially while windowing is usually a
                    property of the data, and therefore flows
                    downwards through the graph, triggering is
                    usually a property of output (i.e. sink)
                    latency - how much are you willing to wait to
                    see data, and what semantics do you want for
                    this early data. Ideally triggers should be
                    specified separately at the ParDo level (Beam
                    has no real notion of Sinks as a special
                    object, so to allow for output specification
                    it has to be on the ParDo), and the triggers
                    should propagate up the graph back to the
                    source. This is in contrast to today where we
                    attach triggering to the windowing information.

                    This was a proposal some years back and there
                    was some effort made to implement it, but the
                    implementation never really got off the ground.

                    On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský
                    <je...@seznam.cz> wrote:

                        On 4/9/24 18:33, Kenneth Knowles wrote:
                        At a top level
                        `setWindowingStrategyInternal` exists to
                        set up the metadata without actually
                        assigning windows. If we were more clever
                        we might have found a way for it to not
                        be public... it is something that can
                        easily lead to an invalid pipeline.
                        Yes, that was what hit me about one minute
                        after I started this thread. :)

                        I think "compatible windows" today in
                        Beam doesn't have very good uses anyhow.
                        I do see how when you are flattening
                        PCollections you might also want to
                        explicitly have a function that says "and
                        here is how to reconcile their different
                        metadata". But is it not reasonable to
                        use Window.into(global window)? It
                        doesn't seem like boilerplate to me
                        actually, but something you really want
                        to know is happening.

                        :)

                        Of course this was the way out, but I was
                        somewhat intuitively seeking something
                        that could go this autonomously.

                        Generally speaking, we might have some
                        room for improvement in the way we handle
                        windows and triggers - windows relate only
                        to GBK and stateful ParDo, triggers relate
                        to GBK only. They have no semantics if
                        downstream processing does not use any of
                        these. There could be a pipeline
                        preprocessing stage that would discard
                        (replace with meaningful defaults) any of
                        these metadata that is unused, but can
                        cause Pipeline to fail at construction
                        time. It is also (to me) somewhat
                        questionable if triggers are really a
                        property of a PCollection or a property of
                        a specific transform (GBK - ehm, actually
                        (stateless) 'key by' + 'reduce by key',
                        but that is completely different story :))
                        because (non-default) triggers are likely
                        not preserved across multiple transforms.
                        Maybe the correct subject of this thread
                        could be "are we sure our windowing and
                        triggering semantics is 100% correct"?
                        Probably the - wrong - expectations at the
                        beginning of this thread were due to
                        conflict in my mental model of how things
                        'could' work as opposed to how they
                        actually work. :)

                         Jan


                        Kenn

                        On Tue, Apr 9, 2024 at 9:19 AM Jan
                        Lukavský <je...@seznam.cz> wrote:

                            On 4/6/24 21:23, Reuven Lax via dev
                            wrote:
                            So the problem here is that windowFn
                            is a property of the PCollection,
                            not the element, and the result of
                            Flatten is a single PCollection.
                            Yes. That is the cause of why
                            Flatten.pCollections() needs the same
                            windowFn.

                            In various cases, there is a notion
                            of "compatible" windows. Basically
                            given window functions W1 and W2,
                            provide a W3 that "works" with both.
                            Exactly this would be a nice feature
                            for Flatten, something like 'windowFn
                            resolve strategy', so that if use
                            does not know the windowFn of
                            upstream PCollections this can be
                            somehow resolved at pipeline
                            construction time. Alternatively only
                            as a small syntactic sugar, something
                            like:
                             
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

                            or anything similar. This can be done
                            in user code, so it is not something
                            deeper, but might help in some cases.
                            It would be cool if we could reuse
                            concepts from other cases where such
                            mechanism is needed.


                            Note that Beam already has something
                            similar with side inputs, since the
                            side input often is in a different
                            window than the main input. However
                            main input elements are supposed to
                            see side input elements in the same
                            window (and in fact main inputs are
                            blocked until the side-input window
                            is ready), so we must do a mapping.
                            If for example (and very commonly!)
                            the side input is in the global
                            window and the main input is in a
                            fixed window, by default we will
                            remap the global-window elements
                            into the main-input's fixed window.

                            This is a one-sided merge function,
                            there is a 'main' and 'side' input,
                            but the generic symmetric merge might
                            be possible as well. E.g. if one
                            PCollection of Flatten is in
                            GlobalWindow, I wonder if there are
                            cases where users would actually want
                            to do anything else then apply the
                            same global windowing strategy to all
                            input PCollections.

                             Jan


                            In Side input we also allow the user
                            to control this mapping, so for
                            example side input elements could
                            always map to the previous fixed
                            window (e.g. while processing window
                            12-1, you want to see summary data
                            of all records in the previous
                            window 11-12). Users can do this by
                            providing a WindowMappingFunction to
                            the View - essentially a function
                            from window to window. Unfortunately
                            this is hard to use (one must create
                            their own PCollectionView class) and
                            very poorly documented, so I doubt
                            many users know about this!

                            Reuven

                            On Sat, Apr 6, 2024 at 7:09 AM Jan
                            Lukavský <je...@seznam.cz> wrote:

                                Immediate self-correction,
                                although setting the strategy
                                directly via
                                setWindowingStrategyInternal()
                                *seemed* to be working during
                                Pipeline
                                construction time, during
                                runtime it obviously does not
                                work, because
                                the PCollection was still
                                windowed using the old windowFn.
                                Make sense to
                                me, but there remains the other
                                question if we can make flattening
                                PCollections with incompatible
                                windowFns more user-friendly.
                                The current
                                approach where we require the
                                same windowFn for all input
                                PCollections
                                creates some unnecessary
                                boilerplate code needed on user
                                side.

                                  Jan

                                On 4/6/24 15:45, Jan Lukavský wrote:
                                > Hi,
                                >
                                > I came across a case where using
                                >
                                PCollection#applyWindowingStrategyInternal
                                seems legit in user core.
                                > The case is roughly as follows:
                                >
                                >  a) compute some streaming
                                statistics
                                >
                                >  b) apply the same transform
                                (say ComputeWindowedAggregation)
                                with
                                > different parameters on these
                                statistics yielding two windowed
                                > PCollections - first is global
                                with early trigger, the other is
                                > sliding window, the specific
                                parameters of the windowFns are
                                > encapsulated in the
                                ComputeWindowedAggregation transform
                                >
                                >  c) apply the same transform
                                on both of the above PCollections,
                                > yielding two PCollections with
                                the same types, but different
                                windowFns
                                >
                                >  d) flatten these PCollections
                                into single one (e.g. for
                                downstream
                                > processing - joining - or
                                flushing to sink)
                                >
                                > Now, the flatten will not
                                work, because these PCollections
                                have
                                > different windowFns. It would
                                be possible to restore the
                                windowing for
                                > either of them, but it
                                requires to somewhat break the
                                encapsulation of
                                > the transforms that produce
                                the windowed outputs. A more
                                natural
                                > solution is to take the
                                WindowingStrategy from the
                                global aggregation
                                > and set it via
                                setWindowingStrategyInternal()
                                to the other
                                > PCollection. This works, but
                                it uses API that is marked as
                                @Internal
                                > (and obviously, the name as
                                well suggests it is not intended
                                for
                                > client-code usage).
                                >
                                > The question is, should we
                                make a legitimate version of
                                this call? Or
                                > should we introduce a way for
                                Flatten.pCollections() to
                                re-window the
                                > input PCollections
                                appropriately? In the case of
                                conflicting
                                > WindowFns, where one of them
                                is GlobalWindowing strategy, it
                                seems to
                                > me that the user's intention
                                is quite well-defined (this
                                might extend
                                > to some 'flatten windowFn
                                resolution strategy', maybe).
                                >
                                > WDYT?
                                >
                                >  Jan
                                >

Reply via email to