I've probably heard about it, but I never read the proposal. Sounds great, but that would require to change our ParDos from the 'directive' style to something more functional, so that processing of elements, state updates and outputting results can be decoupled and managed by the runner independently. This goes exactly in the direction of unifying GBK and Combine with stateful ParDo. Sounds like something worth exploring for Beam 3. :)

Anyway, thanks for this discussion, helped me clarify some more white spots.

 Jan

On 4/10/24 19:24, Reuven Lax via dev wrote:
Are you familiar with the "sink triggers" proposal?

Essentially while windowing is usually a property of the data, and therefore flows downwards through the graph, triggering is usually a property of output (i.e. sink) latency - how much are you willing to wait to see data, and what semantics do you want for this early data. Ideally triggers should be specified separately at the ParDo level (Beam has no real notion of Sinks as a special object, so to allow for output specification it has to be on the ParDo), and the triggers should propagate up the graph back to the source. This is in contrast to today where we attach triggering to the windowing information.

This was a proposal some years back and there was some effort made to implement it, but the implementation never really got off the ground.

On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský <je...@seznam.cz> wrote:

    On 4/9/24 18:33, Kenneth Knowles wrote:
    At a top level `setWindowingStrategyInternal` exists to set up
    the metadata without actually assigning windows. If we were more
    clever we might have found a way for it to not be public... it is
    something that can easily lead to an invalid pipeline.
    Yes, that was what hit me about one minute after I started this
    thread. :)

    I think "compatible windows" today in Beam doesn't have very good
    uses anyhow. I do see how when you are flattening PCollections
    you might also want to explicitly have a function that says "and
    here is how to reconcile their different metadata". But is it not
    reasonable to use Window.into(global window)? It doesn't seem
    like boilerplate to me actually, but something you really want to
    know is happening.

    :)

    Of course this was the way out, but I was somewhat intuitively
    seeking something that could go this autonomously.

    Generally speaking, we might have some room for improvement in the
    way we handle windows and triggers - windows relate only to GBK
    and stateful ParDo, triggers relate to GBK only. They have no
    semantics if downstream processing does not use any of these.
    There could be a pipeline preprocessing stage that would discard
    (replace with meaningful defaults) any of these metadata that is
    unused, but can cause Pipeline to fail at construction time. It is
    also (to me) somewhat questionable if triggers are really a
    property of a PCollection or a property of a specific transform
    (GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but
    that is completely different story :)) because (non-default)
    triggers are likely not preserved across multiple transforms.
    Maybe the correct subject of this thread could be "are we sure our
    windowing and triggering semantics is 100% correct"? Probably the
    - wrong - expectations at the beginning of this thread were due to
    conflict in my mental model of how things 'could' work as opposed
    to how they actually work. :)

     Jan


    Kenn

    On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský <je...@seznam.cz> wrote:

        On 4/6/24 21:23, Reuven Lax via dev wrote:
        So the problem here is that windowFn is a property of the
        PCollection, not the element, and the result of Flatten is a
        single PCollection.
        Yes. That is the cause of why Flatten.pCollections() needs
        the same windowFn.

        In various cases, there is a notion of "compatible" windows.
        Basically given window functions W1 and W2, provide a W3
        that "works" with both.
        Exactly this would be a nice feature for Flatten, something
        like 'windowFn resolve strategy', so that if use does not
        know the windowFn of upstream PCollections this can be
        somehow resolved at pipeline construction time. Alternatively
        only as a small syntactic sugar, something like:
         
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

        or anything similar. This can be done in user code, so it is
        not something deeper, but might help in some cases. It would
        be cool if we could reuse concepts from other cases where
        such mechanism is needed.


        Note that Beam already has something similar with side
        inputs, since the side input often is in a different window
        than the main input. However main input elements are
        supposed to see side input elements in the same window (and
        in fact main inputs are blocked until the side-input window
        is ready), so we must do a mapping. If for example (and very
        commonly!) the side input is in the global window and the
        main input is in a fixed window, by default we will remap
        the global-window elements into the main-input's fixed window.

        This is a one-sided merge function, there is a 'main' and
        'side' input, but the generic symmetric merge might be
        possible as well. E.g. if one PCollection of Flatten is in
        GlobalWindow, I wonder if there are cases where users would
        actually want to do anything else then apply the same global
        windowing strategy to all input PCollections.

         Jan


        In Side input we also allow the user to control this
        mapping, so for example side input elements could always map
        to the previous fixed window (e.g. while processing window
        12-1, you want to see summary data of all records in the
        previous window 11-12). Users can do this by providing a
        WindowMappingFunction to the View - essentially a function
        from window to window. Unfortunately this is hard to use
        (one must create their own PCollectionView class) and very
        poorly documented, so I doubt many users know about this!

        Reuven

        On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
        <je...@seznam.cz> wrote:

            Immediate self-correction, although setting the strategy
            directly via
            setWindowingStrategyInternal() *seemed* to be working
            during Pipeline
            construction time, during runtime it obviously does not
            work, because
            the PCollection was still windowed using the old
            windowFn. Make sense to
            me, but there remains the other question if we can make
            flattening
            PCollections with incompatible windowFns more
            user-friendly. The current
            approach where we require the same windowFn for all
            input PCollections
            creates some unnecessary boilerplate code needed on user
            side.

              Jan

            On 4/6/24 15:45, Jan Lukavský wrote:
            > Hi,
            >
            > I came across a case where using
            > PCollection#applyWindowingStrategyInternal seems legit
            in user core.
            > The case is roughly as follows:
            >
            >  a) compute some streaming statistics
            >
            >  b) apply the same transform (say
            ComputeWindowedAggregation) with
            > different parameters on these statistics yielding two
            windowed
            > PCollections - first is global with early trigger, the
            other is
            > sliding window, the specific parameters of the
            windowFns are
            > encapsulated in the ComputeWindowedAggregation transform
            >
            >  c) apply the same transform on both of the above
            PCollections,
            > yielding two PCollections with the same types, but
            different windowFns
            >
            >  d) flatten these PCollections into single one (e.g.
            for downstream
            > processing - joining - or flushing to sink)
            >
            > Now, the flatten will not work, because these
            PCollections have
            > different windowFns. It would be possible to restore
            the windowing for
            > either of them, but it requires to somewhat break the
            encapsulation of
            > the transforms that produce the windowed outputs. A
            more natural
            > solution is to take the WindowingStrategy from the
            global aggregation
            > and set it via setWindowingStrategyInternal() to the
            other
            > PCollection. This works, but it uses API that is
            marked as @Internal
            > (and obviously, the name as well suggests it is not
            intended for
            > client-code usage).
            >
            > The question is, should we make a legitimate version
            of this call? Or
            > should we introduce a way for Flatten.pCollections()
            to re-window the
            > input PCollections appropriately? In the case of
            conflicting
            > WindowFns, where one of them is GlobalWindowing
            strategy, it seems to
            > me that the user's intention is quite well-defined
            (this might extend
            > to some 'flatten windowFn resolution strategy', maybe).
            >
            > WDYT?
            >
            >  Jan
            >

Reply via email to