On Tue, Apr 27, 2021 at 10:53 AM Jan Lukavský <[email protected]> wrote:

> > If using early triggers, then the side input loads the latest trigger
> for that window.
>
> Does not the the word 'latest' imply processing time matching? The
> watermark of main input might be arbitrarily delayed from the watermark of
> side input. If we consider GlobalWindows on both sides, than "latest"
> trigger in side input looks exactly as processing time matching. Yes,
> between different windows, the matching is in event time. But within the
> same window (ignoring the window mapping for now), the matching looks like
> processing time, right?
>

Not really. Think of each trigger of a window as a refinement - so the
latest trigger for that window is our best approximation of the "correct"
value for that window.  For this reason, window panes are indexed by an
integer (pane_index), not by a timestamp. The idea here is that the main
input element maps to the side input for the best, most-recent knowledge of
the window.

Are you asking for a way to ignore early triggers on side input mapping,
and only map to on-time triggered values for the window?


> If we look at the SimplePushbackSideInputDoFnRunner used by runners
> exactly for the side input matching, there is no testing of side input
> watermark to determine if an element should be 'pushed back' or processed.
> Element is processed if, and only if, all side inputs for particular window
> are ready.
>
On 4/27/21 7:24 PM, Reuven Lax wrote:
>
> The windows shouldn't need to match - at least if the FlinkRunner
> implementation is correct. By default, the side-input's WindowFn should be
> used to map the main input's timestamp into a window, and that window is
> used to determine which version of the side input to load. A custom
> WindowFn can be used to to even more - e.g. if you want the main input
> element to map to the _previous_ window in the side input (you would do
> this by overloading getDefaultWindowMappingFn).
>
> If using early triggers, then the side input loads the latest trigger for
> that window. This is still an event-time mapping - for example two
> main-input events in two different windows will still map the the side
> input for the matching window. However if the side input PCollection is
> triggered, than the latest trigger for each window's side input will be
> loaded.
>
> It's possible that the FlinkRunner implementation is incomplete, in which
> case it should be fixed.
>
> On Tue, Apr 27, 2021 at 9:36 AM Jan Lukavský <[email protected]> wrote:
>
>> It seems to me, that this is true only with matching windows on both
>> sides and default trigger of the side input. Then it will (due to condition
>> a)) work as if the matching happenned in event time. But when using any
>> early triggers then it will work in processing time. At least, that is my
>> understanding from studying the code in FlinkRunner.
>> On 4/27/21 4:05 PM, Robert Burke wrote:
>>
>> I thought the matching happened with elements in the matching window, in
>> Event time, not in Processing time.
>>
>> Granted, I'm not that familiar with this area myself, but one key part of
>> Beam is nearly everything is Event time by default, not Processing time.
>>
>> On Tue, Apr 27, 2021, 12:43 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> I have a question about matching side inputs to main input. First a
>>> recap, to make sure I understand the current state correctly:
>>>
>>>   a) elements from main input are pushed back (stored in state) until a
>>> first side input pane arrives (that might be on time, or early)
>>>
>>>   b) after that, elements from the main input are matched to the current
>>> side input view - the view is updated as new data arrives, but is
>>> matched to the main input elements in processing time
>>>
>>> If this is the current state, my question is, would it be possible to
>>> add a new mode of matching of side inputs? Something like
>>>
>>>   ParDo.of(new MyDoFn()).withSideInput("name", myView,
>>> TimeDomain.EVENT_TIME)
>>>
>>> the semantics would be that the elements from the main PCollection would
>>> be stored into state as pairs with the value of the current main input
>>> watermark and on update of side-input watermark only main input elements
>>> with associated input watermark less than that of the side input would
>>> be matched with the side input and sent downstream. Although this
>>> approach is necessarily more expensive and introducing additional
>>> latency than processing time matching, there are situations when
>>> processing time matching is inapropriate for correctness reasons.
>>>
>>> WDYT?
>>>
>>>   Jan
>>>
>>>

Reply via email to