Correct. This API is merged into Beam, so should be included in the next Beam release.
On Mon, Jan 13, 2020 at 4:00 AM Aaron Dixon <[email protected]> wrote: > Reuven, thank you much for your help and the clarity here, it's very > helpful.. > > Per your solution #2 -- This approach makes sense, seems semantically > right, and something I'll explore when the timer.withOutputTimetstamp(t) > releases. Just for clarity, there is no other way in Beam > (mid-pipeline/post-Source) for me to affect a hold the watermark today > until this API is released, correct? > > On Mon, Jan 13, 2020 at 1:22 AM Reuven Lax <[email protected]> wrote: > >> Semantically though, since you want the CalendarWindow aggregation to be >> based on login timestamps, the watermark should be tracking the login >> timestamps. The watermark is a way for the CalendarWindow to know that as >> far as the system knows, there will be no more events that fall into that >> window. You say that long sessions are holding back the watermark, but >> that's exactly because those long sessions mean that there is still data >> pending for that CalendarWindow, so it is still incomplete! The above >> techniques might appear to solve this, but do so at the expense of somewhat >> randomly causing data to be late or worse dropped. >> >> There are a couple of ways I would address this: >> >> 1. The simplest would be to allow the watermark to track the login >> window, but put a trigger on the CalendarWindow (e.g. trigger every 10 >> seconds). That way whenever the trigger fires you can update the results so >> far for that window. This means that the majority of session that are >> complete can be output without needing to wait for the long sessions, yet >> the window will remain open waiting for those long sessions to complete. >> >> 2. Another possibility is to explicitly identify those extra-long >> sessions, and handle them differently. This I think is a better solution >> than the above timestampSkew solution, because it's deterministic: you know >> exactly which sessions you are handling differently. I would do this by >> using the state+timers API to calculate the sessions, instead of the >> sessions WindowFn. When a session is overly long, then you can stop setting >> the watermark hold for the login time, essentially removing that long >> session from the watermark calculation. >> >> One possibility for how to handle the long sessions "differently" would >> still involve using withAllowedTimestampSkew. This still risks losing some >> of these (if the skew ever happens to be larger than the static value you >> set, you'll not be about to output the session). However now you know >> you're limiting the skewed output to only those specific long sessions >> you've chosen, which is much better than emitting all records with skew and >> hoping that things work out. >> >> Reuven >> >> On Sun, Jan 12, 2020 at 12:07 PM Aaron Dixon <[email protected]> wrote: >> >>> Reuven thanks -- I understand each point although I'm trying to grapple >>> with your concerns expressed in #3; they don't seem avoidable even w/o the >>> allowedSkew feature. >>> >>> Considering your response I see a revision to my solution that omits >>> using the allowed skew configuration but as far as I can tell still has the >>> concerns from #3 (i.e., difficulty in reasoning about which events may be >>> dropped.) >>> >>> My pipeline using the skew config looks like this: >>> >>> (1) CustomSessionWindow >>> emits -> (user, login, logout) @ <logout-time> >>> (2) ParDo >>> -> re-emits same tuple but w/ *login* timestamp >>> (requires custom allowed-skew) >>> (3) CalendarWindow >>> -> <places in window based on **event** timestamp, which is the *login* >>> timestamp> >>> >>> Instead, I can write a CustomCalendarWindow that places the tuple >>> element in the right window based on the *login* timestamp, avoiding the >>> need for the middle/skewing ParDo: >>> >>> (1) CustomSessionWindow >>> -> (user, login, logout) @ <logout-time> >>> (2) CustomCalendarWindow >>> -> <*explicitly* places element in window based on the **login** >>> timestamp> >>> >>> So the use of the ParDo was simply a way to avoid having to write a >>> custom window; it essentially ensures the CalendarWindow windows based on >>> login time. >>> >>> But I don't see how your concerns in #3 are obviated by this revision. >>> Elements going in to the calendar window may be already late...this is >>> something that any (multi-stage) Beam pipeline has to contend with, even >>> without the deprecated allowedSkew facility, no? >>> >>> In other words both of these pipelines are semantically, behaviorally >>> identical. The former just had the benefit of not requiring a custom window >>> implementation. >>> >>> >>> >>> >>> >>> >>> On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax <[email protected]> wrote: >>> >>>> A few comments: >>>> >>>> 1. Yes, this already works on Dataflow (at Beam head). Flink support is >>>> pending at pr/10534. >>>> >>>> 2. Just to make sure where on the same page: getAllowedTimestampSkew is >>>> _not_ about outputting behind the watermark. Rather it's about outputting a >>>> timestamp that's less than the current input timestamp. If for example the >>>> watermark is 12:00 and the current input element has a timestamp of 11:00 >>>> (because it's late), then you can output an element at 11:00 with no need >>>> to set this parameter. It appears that the JavaDoc is somewhat confusing on >>>> this method. >>>> >>>> 3. The reason for this parameter is that the watermark only correctly >>>> tracks timestamps internal to the pipeline if your code doesn't make >>>> timestamps travel back in time - i.e. a ParDo taking an element with a >>>> timestamp of 12:00 and outputting another element. If you use >>>> getAllowedTimestampSkew your elements produced might not be tracked by the >>>> watermark and will show up late (even if the source element is on time). >>>> What's worse, there's a chance that the elements will be older than >>>> allowedLateness and will get dropped altogether (this can happen even if >>>> allowedTimestampSkew < maxAllowedLateness, because the input element might >>>> already be late and you'll then output an element that has an even earlier >>>> timestamp). >>>> >>>> 4. It sounds like you both want and don't want a watermark. You want >>>> the watermark to not be held up by your input (so that your aggregations >>>> keep triggering), but you then want to output old data which might prevent >>>> the watermark from working properly, and might cause data to be dropped. >>>> Have you considered instead using either triggers or timers to trigger your >>>> aggregations? That way you don't need to wait for the watermark to advance >>>> to the end of the window to trigger the aggregation, but the end-of-window >>>> aggregation will still be correct. >>>> >>>> Reuven >>>> >>>> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon <[email protected]> wrote: >>>> >>>>> Reuven thanks for your insights so far. Just wanted to press a little >>>>> more on the deprecation question as I'm still (so far) convinced that my >>>>> use case is quite a straightforward justification (I'm looking for >>>>> confirmation or correction to my thinking here.) I've simplified my use >>>>> case a bit if it helps things: >>>>> >>>>> Use case: "For users that login on a given calendar day, what is the >>>>> average login time?" >>>>> >>>>> So I have two event types LOGIN and LOGOUT. I capture a user login >>>>> session (using custom windowing or state api, doesn't matter) and I use >>>>> the >>>>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to >>>>> not be delayed. >>>>> >>>>> However per my use case requirements I must window using the LOGIN >>>>> time. So I use outputWithTimestamp plus skew configuration to this end. >>>>> >>>>> Since most of my users login and logout within the same calendar day, >>>>> I get may per-day aggregations right on time in real-time. >>>>> >>>>> Only for the few users that logout after the day that they login will >>>>> I see actual late aggregations produced in which case I can leverage >>>>> Beam's >>>>> various lateness configuration levers to trade completeness for storage, >>>>> etc. >>>>> >>>>> This to me seems a *very* straightforward justification for my use of >>>>> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that >>>>> facility. >>>>> >>>>> I realize there are other various solutions, now and coming soon, that >>>>> involve holding the watermark -- but any solution that requires holding >>>>> the >>>>> watermark means that I have to give up getting on-time aggregations at the >>>>> very end of the calendar day (window). I would much rather (and reasonably >>>>> so?) get on-time aggregations covering the majority of my users and be >>>>> happy to refine these averages when my few latent users logout in a later >>>>> day. >>>>> >>>>> In some Beam documentation [1] there is the idea of "unobservably late >>>>> data". That is, I have specific elements that are output late (behind the >>>>> watermark) but because they are guaranteed to land *within the window* and >>>>> they are therefore promoted to be on-time. This conceptualization of >>>>> things >>>>> seems very well-suited to my simple use case but definitely open to a >>>>> different way of thinking in my approach. >>>>> >>>>> My main concern is that my pipeline will be leveraging a Deprecated >>>>> facility (DoFn#getAllowedTimestampSkew) but I don't see other viable >>>>> options (within Beam) yet. >>>>> >>>>> (Hope I'm not pressing too hard on this question here. I think this >>>>> use case is interesting because it ...seems... to be a rather >>>>> simple/distilled justification for being able to output data behind the >>>>> watermark mid-stream.) >>>>> >>>>> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html >>>>> >>>>> >>>>> On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon <[email protected]> >>>>> wrote: >>>>> >>>>>> Oh nice—that will be great—will look forward to this one! Any idea of >>>>>> Dataflow will support? >>>>>> >>>>>> On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> There is now (as of last week) a way to hold back the watermark with >>>>>>> the state API (though not yet in a released version of Beam). If you >>>>>>> set a >>>>>>> timer using withOutputTimetstamp(t), the watermark will be held to t. >>>>>>> >>>>>>> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Reuven thanks for your quick reply >>>>>>>> >>>>>>>> I've tried that but the drag it puts on the watermark was too >>>>>>>> intrusive. For example, -- even if just a single user among many >>>>>>>> decided to >>>>>>>> remain logged-in for a few days then the watermark holds everything >>>>>>>> else >>>>>>>> back. >>>>>>>> >>>>>>>> This was when using a custom session window. I've recently been >>>>>>>> using the State API to do my custom session tracking to avoid issues >>>>>>>> with >>>>>>>> downward merging of windows (see earlier mailing list thread) ... with >>>>>>>> the >>>>>>>> State API .. I'm not able to hold the watermark back (I think) ... but >>>>>>>> in >>>>>>>> any case, I prefer the behavior where the watermark moves forward with >>>>>>>> the >>>>>>>> upstream events and to deal with the very few straggler users by a >>>>>>>> lateness >>>>>>>> configuration. >>>>>>>> >>>>>>>> Does that make sense? So far to me this seems very reasonable (to >>>>>>>> want to keep the watermark moving and deal w/ the late events the few >>>>>>>> of >>>>>>>> which actually fall out of the window using explicit lateness >>>>>>>> configuration.) >>>>>>>> >>>>>>>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Have you looked at using >>>>>>>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the >>>>>>>>> downstream watermark back to the beginning of the window (presumably >>>>>>>>> the >>>>>>>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp >>>>>>>>> using >>>>>>>>> the CLICK GREEN timestamp without needing to set the allowed-lateness >>>>>>>>> skew. >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I've just built a pipeline in Beam and after exploring several >>>>>>>>>> options for my use case, I've ended up relying on the deprecated >>>>>>>>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems >>>>>>>>>> to me a >>>>>>>>>> quite valid use case. So I suppose this is a vote for un-deprecating >>>>>>>>>> this >>>>>>>>>> API (or a teachable moment in which I could be pointed to a more >>>>>>>>>> suitable >>>>>>>>>> non-deprecated approach.) >>>>>>>>>> >>>>>>>>>> I'll stick with a previously simplification of my use case: >>>>>>>>>> >>>>>>>>>> I get these events from my users: >>>>>>>>>> LOGIN >>>>>>>>>> CLICK GREEN BUTTON >>>>>>>>>> LOGOUT >>>>>>>>>> >>>>>>>>>> I capture user session duration (logout time *minus* login time) >>>>>>>>>> and I want to perform a PER DAY average (i.e., my window is on >>>>>>>>>> CalendarDays) BUT where the aggregation's timestamp is the time of >>>>>>>>>> the >>>>>>>>>> CLICK GREEN event. >>>>>>>>>> >>>>>>>>>> So once I calculate and emit a single user's session duration I >>>>>>>>>> need to .outputWithTimestamp using the CLICK GREEN event's >>>>>>>>>> timestamp. This >>>>>>>>>> involves, of course, outputting with a timestamp *before* the >>>>>>>>>> watermark. >>>>>>>>>> >>>>>>>>>> In most cases my users LOGOUT in the same day as the CLICK GREEN >>>>>>>>>> BUTTON event, so even though I'm typically outputting a timestamp >>>>>>>>>> before >>>>>>>>>> the watermark the CalendarDay window is not yet closed and so most >>>>>>>>>> user >>>>>>>>>> session duration's do not affect a late aggregation for that >>>>>>>>>> CalendarDay. >>>>>>>>>> >>>>>>>>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN >>>>>>>>>> event do I have to contend with potentially late data contributing >>>>>>>>>> back to >>>>>>>>>> a prior CalendarDay. >>>>>>>>>> >>>>>>>>>> In any case, I have .withAllowedLateness to allow me to make a >>>>>>>>>> call here about what I'm willing tradeoff (keeping windows open vs. >>>>>>>>>> dropping data for users with overly long sessions), etc. >>>>>>>>>> >>>>>>>>>> This here seems to be a simple scenario (it is effectively my >>>>>>>>>> real-world scenario) and the >>>>>>>>>> .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem to cover it >>>>>>>>>> in a >>>>>>>>>> straightforward, effective way. >>>>>>>>>> >>>>>>>>>> However of course I don't like building production code on >>>>>>>>>> deprecated capabilities -- so advice on alternatives (or perhaps a >>>>>>>>>> reconsideration of this deprecation :) ) would be appreciated. >>>>>>>>>> >>>>>>>>>>
