> Are you asking for a way to ignore early triggers on side input mapping, and only map to on-time triggered values for the window?

No, that could for sure be done before applying the View transform. I'd like a know if it would be possible to create mode of the matching which would be deterministic. One possibility to make it deterministic seems to be, that main input elements would be pushed back until side input watermark 'catches up' with main input. Whenever the side input watermark would be delayed after the main input watermark, elements would start to be pushed back again. Not sure if I'm explaining it using the right words. The side input watermark can be controlled using timer in an upstream transform, so this defines which elements in main input would be matched onto which pane of the side input.

On 4/27/21 8:03 PM, Reuven Lax wrote:


On Tue, Apr 27, 2021 at 10:53 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    > If using early triggers, then the side input loads the latest
    trigger for that window.

    Does not the the word 'latest' imply processing time matching? The
    watermark of main input might be arbitrarily delayed from the
    watermark of side input. If we consider GlobalWindows on both
    sides, than "latest" trigger in side input looks exactly as
    processing time matching. Yes, between different windows, the
    matching is in event time. But within the same window (ignoring
    the window mapping for now), the matching looks like processing
    time, right?


Not really. Think of each trigger of a window as a refinement - so the latest trigger for that window is our best approximation of the "correct" value for that window. For this reason, window panes are indexed by an integer (pane_index), not by a timestamp. The idea here is that the main input element maps to the side input for the best, most-recent knowledge of the window.

Are you asking for a way to ignore early triggers on side input mapping, and only map to on-time triggered values for the window?

    If we look at the SimplePushbackSideInputDoFnRunner used by
    runners exactly for the side input matching, there is no testing
    of side input watermark to determine if an element should be
    'pushed back' or processed. Element is processed if, and only if,
    all side inputs for particular window are ready.

    On 4/27/21 7:24 PM, Reuven Lax wrote:
    The windows shouldn't need to match - at least if the FlinkRunner
    implementation is correct. By default, the side-input's WindowFn
    should be used to map the main input's timestamp into a window,
    and that window is used to determine which version of the side
    input to load. A custom WindowFn can be used to to even more -
    e.g. if you want the main input element to map to the _previous_
    window in the side input (you would do this by overloading
    getDefaultWindowMappingFn).

    If using early triggers, then the side input loads the latest
    trigger for that window. This is still an event-time mapping -
    for example two main-input events in two different windows will
    still map the the side input for the matching window. However if
    the side input PCollection is triggered, than the latest trigger
    for each window's side input will be loaded.

    It's possible that the FlinkRunner implementation is incomplete,
    in which case it should be fixed.

    On Tue, Apr 27, 2021 at 9:36 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        It seems to me, that this is true only with matching windows
        on both sides and default trigger of the side input. Then it
        will (due to condition a)) work as if the matching happenned
        in event time. But when using any early triggers then it will
        work in processing time. At least, that is my understanding
        from studying the code in FlinkRunner.

        On 4/27/21 4:05 PM, Robert Burke wrote:
        I thought the matching happened with elements in the
        matching window, in Event time, not in Processing time.

        Granted, I'm not that familiar with this area myself, but
        one key part of Beam is nearly everything is Event time by
        default, not Processing time.

        On Tue, Apr 27, 2021, 12:43 AM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            Hi,

            I have a question about matching side inputs to main
            input. First a
            recap, to make sure I understand the current state
            correctly:

              a) elements from main input are pushed back (stored in
            state) until a
            first side input pane arrives (that might be on time, or
            early)

              b) after that, elements from the main input are
            matched to the current
            side input view - the view is updated as new data
            arrives, but is
            matched to the main input elements in processing time

            If this is the current state, my question is, would it
            be possible to
            add a new mode of matching of side inputs? Something like

              ParDo.of(new MyDoFn()).withSideInput("name", myView,
            TimeDomain.EVENT_TIME)

            the semantics would be that the elements from the main
            PCollection would
            be stored into state as pairs with the value of the
            current main input
            watermark and on update of side-input watermark only
            main input elements
            with associated input watermark less than that of the
            side input would
            be matched with the side input and sent downstream.
            Although this
            approach is necessarily more expensive and introducing
            additional
            latency than processing time matching, there are
            situations when
            processing time matching is inapropriate for correctness
            reasons.

            WDYT?

              Jan

Reply via email to