On Tue, Apr 27, 2021 at 10:53 AM Jan Lukavský <[email protected]> wrote:
> > If using early triggers, then the side input loads the latest trigger > for that window. > > Does not the the word 'latest' imply processing time matching? The > watermark of main input might be arbitrarily delayed from the watermark of > side input. If we consider GlobalWindows on both sides, than "latest" > trigger in side input looks exactly as processing time matching. Yes, > between different windows, the matching is in event time. But within the > same window (ignoring the window mapping for now), the matching looks like > processing time, right? > Not really. Think of each trigger of a window as a refinement - so the latest trigger for that window is our best approximation of the "correct" value for that window. For this reason, window panes are indexed by an integer (pane_index), not by a timestamp. The idea here is that the main input element maps to the side input for the best, most-recent knowledge of the window. Are you asking for a way to ignore early triggers on side input mapping, and only map to on-time triggered values for the window? > If we look at the SimplePushbackSideInputDoFnRunner used by runners > exactly for the side input matching, there is no testing of side input > watermark to determine if an element should be 'pushed back' or processed. > Element is processed if, and only if, all side inputs for particular window > are ready. > On 4/27/21 7:24 PM, Reuven Lax wrote: > > The windows shouldn't need to match - at least if the FlinkRunner > implementation is correct. By default, the side-input's WindowFn should be > used to map the main input's timestamp into a window, and that window is > used to determine which version of the side input to load. A custom > WindowFn can be used to to even more - e.g. if you want the main input > element to map to the _previous_ window in the side input (you would do > this by overloading getDefaultWindowMappingFn). > > If using early triggers, then the side input loads the latest trigger for > that window. This is still an event-time mapping - for example two > main-input events in two different windows will still map the the side > input for the matching window. However if the side input PCollection is > triggered, than the latest trigger for each window's side input will be > loaded. > > It's possible that the FlinkRunner implementation is incomplete, in which > case it should be fixed. > > On Tue, Apr 27, 2021 at 9:36 AM Jan Lukavský <[email protected]> wrote: > >> It seems to me, that this is true only with matching windows on both >> sides and default trigger of the side input. Then it will (due to condition >> a)) work as if the matching happenned in event time. But when using any >> early triggers then it will work in processing time. At least, that is my >> understanding from studying the code in FlinkRunner. >> On 4/27/21 4:05 PM, Robert Burke wrote: >> >> I thought the matching happened with elements in the matching window, in >> Event time, not in Processing time. >> >> Granted, I'm not that familiar with this area myself, but one key part of >> Beam is nearly everything is Event time by default, not Processing time. >> >> On Tue, Apr 27, 2021, 12:43 AM Jan Lukavský <[email protected]> wrote: >> >>> Hi, >>> >>> I have a question about matching side inputs to main input. First a >>> recap, to make sure I understand the current state correctly: >>> >>> a) elements from main input are pushed back (stored in state) until a >>> first side input pane arrives (that might be on time, or early) >>> >>> b) after that, elements from the main input are matched to the current >>> side input view - the view is updated as new data arrives, but is >>> matched to the main input elements in processing time >>> >>> If this is the current state, my question is, would it be possible to >>> add a new mode of matching of side inputs? Something like >>> >>> ParDo.of(new MyDoFn()).withSideInput("name", myView, >>> TimeDomain.EVENT_TIME) >>> >>> the semantics would be that the elements from the main PCollection would >>> be stored into state as pairs with the value of the current main input >>> watermark and on update of side-input watermark only main input elements >>> with associated input watermark less than that of the side input would >>> be matched with the side input and sent downstream. Although this >>> approach is necessarily more expensive and introducing additional >>> latency than processing time matching, there are situations when >>> processing time matching is inapropriate for correctness reasons. >>> >>> WDYT? >>> >>> Jan >>> >>>
