> In your proposed solution, it probably could be expressed as a new merging WindowFn. You would assign each Green element to two tagged windows that were GreenFromOrange and GreenToBlue type, and have a separate window tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with OrangeWindow only, etc.
I'm finally returning to considering this implementation and getting into the brass tacks of it I'm realizing I didn't 100% understand your proposal^^ With a merging windows solution I am of the assumption that whatever is the final merge, it will produce one and only one window (so that my aggregation is the single produced value I need.) As a reminder my end goal is to emit the aggregation with an event timestamp equal to the GREEN event. I am very curious your proposal, if you could flesh it out more (how the merges would carry out til the final single window.) However, wouldn't the following very simple custom window strategy do the trick: 1) Assign windows just like Beam's out-of-the-box Sessions (using a configured gap duration) but recording the GREEN timestamp w/in the custom window object when we see it. 2) When the terminal BLUE event is seen, merge/collapse all windows such that the final window's right-bound (ie, `maxTimestamp()`) falls back to be equal to the GREEN value. (This implies of course that the window's right bound could fall back substantially.) The default TimestampCombiner/END_OF_WINDOW would then ensure that the emitted output timestamp of the agg was this GREEN timestamp. The input watermark would likely be far ahead of GREEN so of course this final merged window would presumably fire immediately. The output watermark (I think) is held to the minimum of elements in live windows so we should not be regressing the watermark by falling our window's right-bound far back to GREEN in this way. Any notes on this would be appreciated. Especially confirmation about how I'm thinking about the watermark would help me get a handle on implementing correct custom windows. Thanks! On Fri, Nov 15, 2019 at 3:51 PM Kenneth Knowles <[email protected]> wrote: > On Wed, Nov 13, 2019 at 7:39 PM Aaron Dixon <[email protected]> wrote: > >> This is a great help. Thank you. I like the custom window solution >> pattern as a way to hold the watermark and merge down to keep the watermark >> where it is needed. Perhaps there is some interesting generalized session >> window here.. I'll have to digest the stateful DoFn approach. Avoiding >> unnecessary shuffles is a good note. >> >> As a side note, there is MIN, MAX and END_OF_WINDOW TimestampCombiner. >> Has it been discussed to ever allow more customization here? Seems like >> customizing the combiner with element-awareness would have solved this >> problem, as well. >> > > Good question :-). In fact, the original design was OutputTimeFn that was > a custom user-defined function. > > - Discussed a bit here: > https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#heading=h.45gyyckqhg4c > - Removed here: https://github.com/apache/beam/pull/2683 > > Users found the name and choices confusing. Also, the runner needs to grok > the behavior of it in order to implement many optimizations. Before > portability there were a bunch of instanceof checks and anything not on the > happy path was slower. With portability, it needs to be represented in > protobuf. Reinstating this would mean adding a new CUSTOM enum to > TimestampCombiner and it would have performance costs. > > Kenn > > >> >> >> On Wed, Nov 13, 2019 at 7:56 PM Kenneth Knowles <[email protected]> wrote: >> >>> You've done a very good analysis* and I think your solution is pretty >>> clever. The simple fact is this: the watermark has to be held to the >>> minimum of any output you intend to produce. So for your use case, the hold >>> has to be the timestamp of the Green element. Your solution does hold the >>> watermark to the right time. I have a couple thoughts that may be helpful. >>> >>> 0. If you partition by user does the stream contain a bunch of Orange, >>> Green, Blue elements? Is it possible that a session contains multiple >>> [Orange, Green, Blue] sequences? Is it possible that an [Orange, Green, >>> Blue] sequence is split across multiple sessions? >>> >>> 1. In your proposed solution, it probably could be expressed as a new >>> merging WindowFn. You would assign each Green element to two tagged windows >>> that were GreenFromOrange and GreenToBlue type, and have a separate window >>> tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with >>> OrangeWindow only, etc. >>> >>> 2. This might also turn out simply as a stateful DoFn, where you >>> manually manage what state the funnel is in. When you set a timer to wait >>> for the Orange element, you may need an upcoming feature where you set a >>> timer for a future event time but the watermark is held to the Green >>> element's timestamp. CC Reuven on that use case. >>> >>> What I would like to avoid is you having to do two shuffles (on whatever >>> runner). This should be doable with one. >>> >>> *SessionWindow plus EARLIEST holding up the watermark/pipeline was an >>> early complaint. That is part of why we switched the default to >>> end-of-window (also it is easier to understand and more efficient to >>> compute) >>> >>> Kenn >>> >>> On Wed, Nov 13, 2019 at 3:25 PM Aaron Dixon <[email protected]> wrote: >>> >>>> This is a real use case we have, but simplified: >>>> >>>> My user session look like this: user visits a page, and clicks three >>>> buttons: Orange then Green then Blue. >>>> >>>> I need to compute the average time between Orange & Blue clicks but I >>>> need to window on the timestamp of the green button click. >>>> >>>> In requirements terms: Compute average time between Orange and Blue for >>>> all Green clicks that occur on Monday. (So User could click Orange on >>>> Sunday, Green on Monday and Blue on Tuesday.) >>>> >>>> One strategy is to try to use a single SessionWindow to capture the >>>> entire user session; then calculate the *span* (time between Orange >>>> and Blue clicks) and *then* compute average of all spans. >>>> >>>> To do this the *span*/counts would have to all "land" in a window >>>> representing Monday. >>>> >>>> If I use a SessionWindow w/ TimestampCombiner/EARLIEST then I can make >>>> sure they land in this window using .outputWithTimestamp without worrying >>>> that I'll be regressing the event timestamp. >>>> >>>> Except when I use this Combiner/EARLIEST strategy my watermark is held >>>> up substantially (and incidentally seems to drag the pipeline). >>>> >>>> But if I use Beam's default TimestampCombiner/END_OF_WINDOW then I >>>> won't be able to output the *span* result at a timestamp representing >>>> the Green click. >>>> >>>> So a single SessionWindow seems out. (Unless I'm missing something.) >>>> >>>> The only other strategy I can conceive of at the moment is to capture >>>> *two* sessions, representing each "leg" of the overall session. One >>>> windows on the [Orange,Green] (using END_OF_WINDOW); the other [Green,Blue] >>>> (using EARLIEST). Then I can "join" these two to get both legs together and >>>> compute the overall span. This seems like a quite complicated way to solve >>>> this (simple?) problem. >>>> >>>> Thoughts? What am I missing? >>>> >>>
