> To implement retraction (at least to do so efficiently) I think you'll want it integrated in the model. e.g. for combiners one would want to add the option to subtract values, otherwise you would end up having to store every element defeating the performance that combiners provide.

I think we use different words for the same. :) This is what I meant by "and define appropriate retraction functions to CombineFn and the like".

On the other hand, you _could_ derive CombineFn with retraction capabilities, if you have a DSL that provides "retraction accumulation" function along with "addition accumulation", this should be possible to be combined into a CombineFn as we have it today. This should only require that the operation being combined is associative, commutative and have a valid unary operator '-' (which will be the result of the "retraction combine").

On 4/23/24 18:08, Reuven Lax via dev wrote:


On Tue, Apr 23, 2024 at 7:52 AM Jan Lukavský <je...@seznam.cz> wrote:

    On 4/22/24 20:40, Kenneth Knowles wrote:

    I'll go ahead and advertise
    https://s.apache.org/beam-sink-triggers again for this thread.
    +1

    There are a couple of difficult technical problems in there. One
    of them is backwards-propagating triggering to minimize extra
    latency. We can probably solve this as well as we solve
    forward-propagating without too much trouble. We also can/should
    leave it abstract so runners can implement either through
    back-propagating local triggering rules or using run-time
    communication to trigger upstream. These actually could interact
    well with stateful ParDo by sending a "trigger now please"
    message or some such.
    Yes, this was what I was referring to as the more "functional"
    style for stateful ParDo. At minimum, it requires adding new
    callback independent of @ProcessElement and @OnTimer -  @OnTrigger?

    But we also probably need retractions that automatically flow
    through the pipeline and update aggregations. Why? Because
    currently triggers don't just control update frequency but
    actually create new elements each time, so they require
    user fix-up logic to do the right thing with the output. When we
    go to higher levels of abstraction we need this to "just work"
    without changing the pipeline. There have been two (nearly
    identical) propotypes of adding retractions to the DirectRunner
    as proof of concept. But there's also work in all the IOs since
    they are not retraction-aware. Also lots of work in many library
    transforms where a retraction should be computed by running the
    transform "like normal" but then negating the result, but that
    cannot be the default for ParDo because it is deliberately more
    flexible, we just have to annotate Map and the like.
    +1. I think retractions could be implemented as DSL on top of the
    current model. Retractions can be viewed as regular data elements
    with additional metadata (upsert, delete). For ParDo we could add
    something like @RetractElement (and define appropriate retraction
    functions to CombineFn and the like). We could introduce
    RetractedPCollection or similar for this purpose.


To implement retraction (at least to do so efficiently) I think you'll want it integrated in the model. e.g. for combiners one would want to add the option to subtract values, otherwise you would end up having to store every element defeating the performance that combiners provide.


    Getting all this right is a lot of work but would result in a
    system that is simpler to use out-of-the-box and a more robust
    SQL implementation (because you can't use triggers with SQL
    unless you have retractions or some other "just works" mode of
    computation). It would essentially change Beam into a
    delta-processing engine, which it arguably should be, with whole
    append-only elements being a simplest degenerate case of a delta
    (which would be highly optimized in batch/archival processing).
    +1

    Kenn

    On Tue, Apr 16, 2024 at 2:36 AM Reuven Lax via dev
    <dev@beam.apache.org> wrote:

        Yes, but that's inevitable as stateful ParDo in a sense live
        outside of most of the window/trigger semantics. Basically a
        stateful ParDo is the user executing low-level control over
        these semantics, and controlling output frequency themselves
        with timers. One could however still propagate the trigger
        upstream of the stateful ParDo, though I'm not sure if that's
        the best approach.

        On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský
        <je...@seznam.cz> wrote:

            On 4/11/24 18:20, Reuven Lax via dev wrote:
            I'm not sure it would require all that. A "basic"
            implementation could be done on top of our existing
            model. Essentially the user would specify triggers at
            the sink ParDos, then the runner would walk backwards up
            the graph, reverse-propagating these triggers (with some
            resolution rules aimed at keeping the minimum trigger
            latency). The runner could under the covers simply just
            apply the appropriate trigger into the Window, using the
            current mechanism. Of course building this all into the
            framework from scratch would be cleaner, but we could
            also build this on top of what we have.
            Any propagation from sink to source would be blocked by
            any stateful ParDo, because that does not adhere to the
            concept of trigger, no? Hence, we could get the required
            downstream 'cadence' of outputs, but these would change
            only when the upstream ParDo emits any data. Yes, one can
            argue that stateful ParDo is supposed to emit data at
            fast as possible, then this seems to work.

            On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský
            <je...@seznam.cz> wrote:

                I've probably heard about it, but I never read the
                proposal. Sounds great, but that would require to
                change our ParDos from the 'directive' style to
                something more functional, so that processing of
                elements, state updates and outputting results can
                be decoupled and managed by the runner
                independently. This goes exactly in the direction of
                unifying GBK and Combine with stateful ParDo. Sounds
                like something worth exploring for Beam 3. :)

                Anyway, thanks for this discussion, helped me
                clarify some more white spots.

                 Jan

                On 4/10/24 19:24, Reuven Lax via dev wrote:
                Are you familiar with the "sink triggers" proposal?

                Essentially while windowing is usually a property
                of the data, and therefore flows downwards through
                the graph, triggering is usually a property of
                output (i.e. sink) latency - how much are you
                willing to wait to see data, and what semantics do
                you want for this early data. Ideally triggers
                should be specified separately at the ParDo level
                (Beam has no real notion of Sinks as a special
                object, so to allow for output specification it has
                to be on the ParDo), and the triggers should
                propagate up the graph back to the source. This is
                in contrast to today where we attach triggering to
                the windowing information.

                This was a proposal some years back and there was
                some effort made to implement it, but the
                implementation never really got off the ground.

                On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský
                <je...@seznam.cz> wrote:

                    On 4/9/24 18:33, Kenneth Knowles wrote:
                    At a top level `setWindowingStrategyInternal`
                    exists to set up the metadata without actually
                    assigning windows. If we were more clever we
                    might have found a way for it to not be
                    public... it is something that can easily lead
                    to an invalid pipeline.
                    Yes, that was what hit me about one minute
                    after I started this thread. :)

                    I think "compatible windows" today in Beam
                    doesn't have very good uses anyhow. I do see
                    how when you are flattening PCollections you
                    might also want to explicitly have a function
                    that says "and here is how to reconcile their
                    different metadata". But is it not reasonable
                    to use Window.into(global window)? It doesn't
                    seem like boilerplate to me actually, but
                    something you really want to know is happening.

                    :)

                    Of course this was the way out, but I was
                    somewhat intuitively seeking something that
                    could go this autonomously.

                    Generally speaking, we might have some room for
                    improvement in the way we handle windows and
                    triggers - windows relate only to GBK and
                    stateful ParDo, triggers relate to GBK only.
                    They have no semantics if downstream processing
                    does not use any of these. There could be a
                    pipeline preprocessing stage that would discard
                    (replace with meaningful defaults) any of these
                    metadata that is unused, but can cause Pipeline
                    to fail at construction time. It is also (to
                    me) somewhat questionable if triggers are
                    really a property of a PCollection or a
                    property of a specific transform (GBK - ehm,
                    actually (stateless) 'key by' + 'reduce by
                    key', but that is completely different story
                    :)) because (non-default) triggers are likely
                    not preserved across multiple transforms. Maybe
                    the correct subject of this thread could be
                    "are we sure our windowing and triggering
                    semantics is 100% correct"? Probably the -
                    wrong - expectations at the beginning of this
                    thread were due to conflict in my mental model
                    of how things 'could' work as opposed to how
                    they actually work. :)

                     Jan


                    Kenn

                    On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský
                    <je...@seznam.cz> wrote:

                        On 4/6/24 21:23, Reuven Lax via dev wrote:
                        So the problem here is that windowFn is a
                        property of the PCollection, not the
                        element, and the result of Flatten is a
                        single PCollection.
                        Yes. That is the cause of why
                        Flatten.pCollections() needs the same
                        windowFn.

                        In various cases, there is a notion of
                        "compatible" windows. Basically given
                        window functions W1 and W2, provide a W3
                        that "works" with both.
                        Exactly this would be a nice feature for
                        Flatten, something like 'windowFn resolve
                        strategy', so that if use does not know
                        the windowFn of upstream PCollections this
                        can be somehow resolved at pipeline
                        construction time. Alternatively only as a
                        small syntactic sugar, something like:
                         
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

                        or anything similar. This can be done in
                        user code, so it is not something deeper,
                        but might help in some cases. It would be
                        cool if we could reuse concepts from other
                        cases where such mechanism is needed.


                        Note that Beam already has something
                        similar with side inputs, since the side
                        input often is in a different window than
                        the main input. However main input
                        elements are supposed to see side input
                        elements in the same window (and in fact
                        main inputs are blocked until the
                        side-input window is ready), so we must
                        do a mapping. If for example (and very
                        commonly!) the side input is in the
                        global window and the main input is in a
                        fixed window, by default we will remap
                        the global-window elements into the
                        main-input's fixed window.

                        This is a one-sided merge function, there
                        is a 'main' and 'side' input, but the
                        generic symmetric merge might be possible
                        as well. E.g. if one PCollection of
                        Flatten is in GlobalWindow, I wonder if
                        there are cases where users would actually
                        want to do anything else then apply the
                        same global windowing strategy to all
                        input PCollections.

                         Jan


                        In Side input we also allow the user to
                        control this mapping, so for example side
                        input elements could always map to the
                        previous fixed window (e.g. while
                        processing window 12-1, you want to see
                        summary data of all records in the
                        previous window 11-12). Users can do this
                        by providing a WindowMappingFunction to
                        the View - essentially a function from
                        window to window. Unfortunately this is
                        hard to use (one must create their own
                        PCollectionView class) and very poorly
                        documented, so I doubt many users know
                        about this!

                        Reuven

                        On Sat, Apr 6, 2024 at 7:09 AM Jan
                        Lukavský <je...@seznam.cz> wrote:

                            Immediate self-correction, although
                            setting the strategy directly via
                            setWindowingStrategyInternal()
                            *seemed* to be working during Pipeline
                            construction time, during runtime it
                            obviously does not work, because
                            the PCollection was still windowed
                            using the old windowFn. Make sense to
                            me, but there remains the other
                            question if we can make flattening
                            PCollections with incompatible
                            windowFns more user-friendly. The
                            current
                            approach where we require the same
                            windowFn for all input PCollections
                            creates some unnecessary boilerplate
                            code needed on user side.

                              Jan

                            On 4/6/24 15:45, Jan Lukavský wrote:
                            > Hi,
                            >
                            > I came across a case where using
                            >
                            PCollection#applyWindowingStrategyInternal
                            seems legit in user core.
                            > The case is roughly as follows:
                            >
                            >  a) compute some streaming statistics
                            >
                            >  b) apply the same transform (say
                            ComputeWindowedAggregation) with
                            > different parameters on these
                            statistics yielding two windowed
                            > PCollections - first is global with
                            early trigger, the other is
                            > sliding window, the specific
                            parameters of the windowFns are
                            > encapsulated in the
                            ComputeWindowedAggregation transform
                            >
                            >  c) apply the same transform on
                            both of the above PCollections,
                            > yielding two PCollections with the
                            same types, but different windowFns
                            >
                            >  d) flatten these PCollections into
                            single one (e.g. for downstream
                            > processing - joining - or flushing
                            to sink)
                            >
                            > Now, the flatten will not work,
                            because these PCollections have
                            > different windowFns. It would be
                            possible to restore the windowing for
                            > either of them, but it requires to
                            somewhat break the encapsulation of
                            > the transforms that produce the
                            windowed outputs. A more natural
                            > solution is to take the
                            WindowingStrategy from the global
                            aggregation
                            > and set it via
                            setWindowingStrategyInternal() to the
                            other
                            > PCollection. This works, but it
                            uses API that is marked as @Internal
                            > (and obviously, the name as well
                            suggests it is not intended for
                            > client-code usage).
                            >
                            > The question is, should we make a
                            legitimate version of this call? Or
                            > should we introduce a way for
                            Flatten.pCollections() to re-window the
                            > input PCollections appropriately?
                            In the case of conflicting
                            > WindowFns, where one of them is
                            GlobalWindowing strategy, it seems to
                            > me that the user's intention is
                            quite well-defined (this might extend
                            > to some 'flatten windowFn
                            resolution strategy', maybe).
                            >
                            > WDYT?
                            >
                            >  Jan
                            >

Reply via email to