On 4/11/24 18:20, Reuven Lax via dev wrote:
I'm not sure it would require all that. A "basic" implementation could be done on top of our existing model. Essentially the user would specify triggers at the sink ParDos, then the runner would walk backwards up the graph, reverse-propagating these triggers (with some resolution rules aimed at keeping the minimum trigger latency). The runner could under the covers simply just apply the appropriate trigger into the Window, using the current mechanism. Of course building this all into the framework from scratch would be cleaner, but we could also build this on top of what we have.
Any propagation from sink to source would be blocked by any stateful ParDo, because that does not adhere to the concept of trigger, no? Hence, we could get the required downstream 'cadence' of outputs, but these would change only when the upstream ParDo emits any data. Yes, one can argue that stateful ParDo is supposed to emit data at fast as possible, then this seems to work.

On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský <je...@seznam.cz> wrote:

    I've probably heard about it, but I never read the proposal.
    Sounds great, but that would require to change our ParDos from the
    'directive' style to something more functional, so that processing
    of elements, state updates and outputting results can be decoupled
    and managed by the runner independently. This goes exactly in the
    direction of unifying GBK and Combine with stateful ParDo. Sounds
    like something worth exploring for Beam 3. :)

    Anyway, thanks for this discussion, helped me clarify some more
    white spots.

     Jan

    On 4/10/24 19:24, Reuven Lax via dev wrote:
    Are you familiar with the "sink triggers" proposal?

    Essentially while windowing is usually a property of the data,
    and therefore flows downwards through the graph, triggering is
    usually a property of output (i.e. sink) latency - how much are
    you willing to wait to see data, and what semantics do you want
    for this early data. Ideally triggers should be specified
    separately at the ParDo level (Beam has no real notion of Sinks
    as a special object, so to allow for output specification it has
    to be on the ParDo), and the triggers should propagate up the
    graph back to the source. This is in contrast to today where we
    attach triggering to the windowing information.

    This was a proposal some years back and there was some effort
    made to implement it, but the implementation never really got off
    the ground.

    On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský <je...@seznam.cz>
    wrote:

        On 4/9/24 18:33, Kenneth Knowles wrote:
        At a top level `setWindowingStrategyInternal` exists to set
        up the metadata without actually assigning windows. If we
        were more clever we might have found a way for it to not be
        public... it is something that can easily lead to an invalid
        pipeline.
        Yes, that was what hit me about one minute after I started
        this thread. :)

        I think "compatible windows" today in Beam doesn't have very
        good uses anyhow. I do see how when you are flattening
        PCollections you might also want to explicitly have a
        function that says "and here is how to reconcile their
        different metadata". But is it not reasonable to use
        Window.into(global window)? It doesn't seem like boilerplate
        to me actually, but something you really want to know is
        happening.

        :)

        Of course this was the way out, but I was somewhat
        intuitively seeking something that could go this autonomously.

        Generally speaking, we might have some room for improvement
        in the way we handle windows and triggers - windows relate
        only to GBK and stateful ParDo, triggers relate to GBK only.
        They have no semantics if downstream processing does not use
        any of these. There could be a pipeline preprocessing stage
        that would discard (replace with meaningful defaults) any of
        these metadata that is unused, but can cause Pipeline to fail
        at construction time. It is also (to me) somewhat
        questionable if triggers are really a property of a
        PCollection or a property of a specific transform (GBK - ehm,
        actually (stateless) 'key by' + 'reduce by key', but that is
        completely different story :)) because (non-default) triggers
        are likely not preserved across multiple transforms. Maybe
        the correct subject of this thread could be "are we sure our
        windowing and triggering semantics is 100% correct"? Probably
        the - wrong - expectations at the beginning of this thread
        were due to conflict in my mental model of how things 'could'
        work as opposed to how they actually work. :)

         Jan


        Kenn

        On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský
        <je...@seznam.cz> wrote:

            On 4/6/24 21:23, Reuven Lax via dev wrote:
            So the problem here is that windowFn is a property of
            the PCollection, not the element, and the result of
            Flatten is a single PCollection.
            Yes. That is the cause of why Flatten.pCollections()
            needs the same windowFn.

            In various cases, there is a notion of "compatible"
            windows. Basically given window functions W1 and W2,
            provide a W3 that "works" with both.
            Exactly this would be a nice feature for Flatten,
            something like 'windowFn resolve strategy', so that if
            use does not know the windowFn of upstream PCollections
            this can be somehow resolved at pipeline construction
            time. Alternatively only as a small syntactic sugar,
            something like:
             
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

            or anything similar. This can be done in user code, so
            it is not something deeper, but might help in some
            cases. It would be cool if we could reuse concepts from
            other cases where such mechanism is needed.


            Note that Beam already has something similar with side
            inputs, since the side input often is in a different
            window than the main input. However main input elements
            are supposed to see side input elements in the same
            window (and in fact main inputs are blocked until the
            side-input window is ready), so we must do a mapping.
            If for example (and very commonly!) the side input is
            in the global window and the main input is in a fixed
            window, by default we will remap the global-window
            elements into the main-input's fixed window.

            This is a one-sided merge function, there is a 'main'
            and 'side' input, but the generic symmetric merge might
            be possible as well. E.g. if one PCollection of Flatten
            is in GlobalWindow, I wonder if there are cases where
            users would actually want to do anything else then apply
            the same global windowing strategy to all input
            PCollections.

             Jan


            In Side input we also allow the user to control this
            mapping, so for example side input elements could
            always map to the previous fixed window (e.g. while
            processing window 12-1, you want to see summary data of
            all records in the previous window 11-12). Users can do
            this by providing a WindowMappingFunction to the View -
            essentially a function from window to window.
            Unfortunately this is hard to use (one must create
            their own PCollectionView class) and very poorly
            documented, so I doubt many users know about this!

            Reuven

            On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
            <je...@seznam.cz> wrote:

                Immediate self-correction, although setting the
                strategy directly via
                setWindowingStrategyInternal() *seemed* to be
                working during Pipeline
                construction time, during runtime it obviously does
                not work, because
                the PCollection was still windowed using the old
                windowFn. Make sense to
                me, but there remains the other question if we can
                make flattening
                PCollections with incompatible windowFns more
                user-friendly. The current
                approach where we require the same windowFn for all
                input PCollections
                creates some unnecessary boilerplate code needed on
                user side.

                  Jan

                On 4/6/24 15:45, Jan Lukavský wrote:
                > Hi,
                >
                > I came across a case where using
                > PCollection#applyWindowingStrategyInternal seems
                legit in user core.
                > The case is roughly as follows:
                >
                >  a) compute some streaming statistics
                >
                >  b) apply the same transform (say
                ComputeWindowedAggregation) with
                > different parameters on these statistics yielding
                two windowed
                > PCollections - first is global with early
                trigger, the other is
                > sliding window, the specific parameters of the
                windowFns are
                > encapsulated in the ComputeWindowedAggregation
                transform
                >
                >  c) apply the same transform on both of the above
                PCollections,
                > yielding two PCollections with the same types,
                but different windowFns
                >
                >  d) flatten these PCollections into single one
                (e.g. for downstream
                > processing - joining - or flushing to sink)
                >
                > Now, the flatten will not work, because these
                PCollections have
                > different windowFns. It would be possible to
                restore the windowing for
                > either of them, but it requires to somewhat break
                the encapsulation of
                > the transforms that produce the windowed outputs.
                A more natural
                > solution is to take the WindowingStrategy from
                the global aggregation
                > and set it via setWindowingStrategyInternal() to
                the other
                > PCollection. This works, but it uses API that is
                marked as @Internal
                > (and obviously, the name as well suggests it is
                not intended for
                > client-code usage).
                >
                > The question is, should we make a legitimate
                version of this call? Or
                > should we introduce a way for
                Flatten.pCollections() to re-window the
                > input PCollections appropriately? In the case of
                conflicting
                > WindowFns, where one of them is GlobalWindowing
                strategy, it seems to
                > me that the user's intention is quite
                well-defined (this might extend
                > to some 'flatten windowFn resolution strategy',
                maybe).
                >
                > WDYT?
                >
                >  Jan
                >

Reply via email to