Reuven, thank you much for your help and the clarity here, it's very helpful..
Per your solution #2 -- This approach makes sense, seems semantically right, and something I'll explore when the timer.withOutputTimetstamp(t) releases. Just for clarity, there is no other way in Beam (mid-pipeline/post-Source) for me to affect a hold the watermark today until this API is released, correct? On Mon, Jan 13, 2020 at 1:22 AM Reuven Lax <re...@google.com> wrote: > Semantically though, since you want the CalendarWindow aggregation to be > based on login timestamps, the watermark should be tracking the login > timestamps. The watermark is a way for the CalendarWindow to know that as > far as the system knows, there will be no more events that fall into that > window. You say that long sessions are holding back the watermark, but > that's exactly because those long sessions mean that there is still data > pending for that CalendarWindow, so it is still incomplete! The above > techniques might appear to solve this, but do so at the expense of somewhat > randomly causing data to be late or worse dropped. > > There are a couple of ways I would address this: > > 1. The simplest would be to allow the watermark to track the login window, > but put a trigger on the CalendarWindow (e.g. trigger every 10 seconds). > That way whenever the trigger fires you can update the results so far for > that window. This means that the majority of session that are complete can > be output without needing to wait for the long sessions, yet the window > will remain open waiting for those long sessions to complete. > > 2. Another possibility is to explicitly identify those extra-long > sessions, and handle them differently. This I think is a better solution > than the above timestampSkew solution, because it's deterministic: you know > exactly which sessions you are handling differently. I would do this by > using the state+timers API to calculate the sessions, instead of the > sessions WindowFn. When a session is overly long, then you can stop setting > the watermark hold for the login time, essentially removing that long > session from the watermark calculation. > > One possibility for how to handle the long sessions "differently" would > still involve using withAllowedTimestampSkew. This still risks losing some > of these (if the skew ever happens to be larger than the static value you > set, you'll not be about to output the session). However now you know > you're limiting the skewed output to only those specific long sessions > you've chosen, which is much better than emitting all records with skew and > hoping that things work out. > > Reuven > > On Sun, Jan 12, 2020 at 12:07 PM Aaron Dixon <atdi...@gmail.com> wrote: > >> Reuven thanks -- I understand each point although I'm trying to grapple >> with your concerns expressed in #3; they don't seem avoidable even w/o the >> allowedSkew feature. >> >> Considering your response I see a revision to my solution that omits >> using the allowed skew configuration but as far as I can tell still has the >> concerns from #3 (i.e., difficulty in reasoning about which events may be >> dropped.) >> >> My pipeline using the skew config looks like this: >> >> (1) CustomSessionWindow >> emits -> (user, login, logout) @ <logout-time> >> (2) ParDo >> -> re-emits same tuple but w/ *login* timestamp >> (requires custom allowed-skew) >> (3) CalendarWindow >> -> <places in window based on **event** timestamp, which is the *login* >> timestamp> >> >> Instead, I can write a CustomCalendarWindow that places the tuple element >> in the right window based on the *login* timestamp, avoiding the need for >> the middle/skewing ParDo: >> >> (1) CustomSessionWindow >> -> (user, login, logout) @ <logout-time> >> (2) CustomCalendarWindow >> -> <*explicitly* places element in window based on the **login** >> timestamp> >> >> So the use of the ParDo was simply a way to avoid having to write a >> custom window; it essentially ensures the CalendarWindow windows based on >> login time. >> >> But I don't see how your concerns in #3 are obviated by this revision. >> Elements going in to the calendar window may be already late...this is >> something that any (multi-stage) Beam pipeline has to contend with, even >> without the deprecated allowedSkew facility, no? >> >> In other words both of these pipelines are semantically, behaviorally >> identical. The former just had the benefit of not requiring a custom window >> implementation. >> >> >> >> >> >> >> On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax <re...@google.com> wrote: >> >>> A few comments: >>> >>> 1. Yes, this already works on Dataflow (at Beam head). Flink support is >>> pending at pr/10534. >>> >>> 2. Just to make sure where on the same page: getAllowedTimestampSkew is >>> _not_ about outputting behind the watermark. Rather it's about outputting a >>> timestamp that's less than the current input timestamp. If for example the >>> watermark is 12:00 and the current input element has a timestamp of 11:00 >>> (because it's late), then you can output an element at 11:00 with no need >>> to set this parameter. It appears that the JavaDoc is somewhat confusing on >>> this method. >>> >>> 3. The reason for this parameter is that the watermark only correctly >>> tracks timestamps internal to the pipeline if your code doesn't make >>> timestamps travel back in time - i.e. a ParDo taking an element with a >>> timestamp of 12:00 and outputting another element. If you use >>> getAllowedTimestampSkew your elements produced might not be tracked by the >>> watermark and will show up late (even if the source element is on time). >>> What's worse, there's a chance that the elements will be older than >>> allowedLateness and will get dropped altogether (this can happen even if >>> allowedTimestampSkew < maxAllowedLateness, because the input element might >>> already be late and you'll then output an element that has an even earlier >>> timestamp). >>> >>> 4. It sounds like you both want and don't want a watermark. You want the >>> watermark to not be held up by your input (so that your aggregations keep >>> triggering), but you then want to output old data which might prevent the >>> watermark from working properly, and might cause data to be dropped. Have >>> you considered instead using either triggers or timers to trigger your >>> aggregations? That way you don't need to wait for the watermark to advance >>> to the end of the window to trigger the aggregation, but the end-of-window >>> aggregation will still be correct. >>> >>> Reuven >>> >>> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon <atdi...@gmail.com> wrote: >>> >>>> Reuven thanks for your insights so far. Just wanted to press a little >>>> more on the deprecation question as I'm still (so far) convinced that my >>>> use case is quite a straightforward justification (I'm looking for >>>> confirmation or correction to my thinking here.) I've simplified my use >>>> case a bit if it helps things: >>>> >>>> Use case: "For users that login on a given calendar day, what is the >>>> average login time?" >>>> >>>> So I have two event types LOGIN and LOGOUT. I capture a user login >>>> session (using custom windowing or state api, doesn't matter) and I use the >>>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to >>>> not be delayed. >>>> >>>> However per my use case requirements I must window using the LOGIN >>>> time. So I use outputWithTimestamp plus skew configuration to this end. >>>> >>>> Since most of my users login and logout within the same calendar day, I >>>> get may per-day aggregations right on time in real-time. >>>> >>>> Only for the few users that logout after the day that they login will I >>>> see actual late aggregations produced in which case I can leverage Beam's >>>> various lateness configuration levers to trade completeness for storage, >>>> etc. >>>> >>>> This to me seems a *very* straightforward justification for my use of >>>> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that >>>> facility. >>>> >>>> I realize there are other various solutions, now and coming soon, that >>>> involve holding the watermark -- but any solution that requires holding the >>>> watermark means that I have to give up getting on-time aggregations at the >>>> very end of the calendar day (window). I would much rather (and reasonably >>>> so?) get on-time aggregations covering the majority of my users and be >>>> happy to refine these averages when my few latent users logout in a later >>>> day. >>>> >>>> In some Beam documentation [1] there is the idea of "unobservably late >>>> data". That is, I have specific elements that are output late (behind the >>>> watermark) but because they are guaranteed to land *within the window* and >>>> they are therefore promoted to be on-time. This conceptualization of things >>>> seems very well-suited to my simple use case but definitely open to a >>>> different way of thinking in my approach. >>>> >>>> My main concern is that my pipeline will be leveraging a Deprecated >>>> facility (DoFn#getAllowedTimestampSkew) but I don't see other viable >>>> options (within Beam) yet. >>>> >>>> (Hope I'm not pressing too hard on this question here. I think this use >>>> case is interesting because it ...seems... to be a rather simple/distilled >>>> justification for being able to output data behind the watermark >>>> mid-stream.) >>>> >>>> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html >>>> >>>> >>>> On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon <atdi...@gmail.com> wrote: >>>> >>>>> Oh nice—that will be great—will look forward to this one! Any idea of >>>>> Dataflow will support? >>>>> >>>>> On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> There is now (as of last week) a way to hold back the watermark with >>>>>> the state API (though not yet in a released version of Beam). If you set >>>>>> a >>>>>> timer using withOutputTimetstamp(t), the watermark will be held to t. >>>>>> >>>>>> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon <atdi...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Reuven thanks for your quick reply >>>>>>> >>>>>>> I've tried that but the drag it puts on the watermark was too >>>>>>> intrusive. For example, -- even if just a single user among many >>>>>>> decided to >>>>>>> remain logged-in for a few days then the watermark holds everything else >>>>>>> back. >>>>>>> >>>>>>> This was when using a custom session window. I've recently been >>>>>>> using the State API to do my custom session tracking to avoid issues >>>>>>> with >>>>>>> downward merging of windows (see earlier mailing list thread) ... with >>>>>>> the >>>>>>> State API .. I'm not able to hold the watermark back (I think) ... but >>>>>>> in >>>>>>> any case, I prefer the behavior where the watermark moves forward with >>>>>>> the >>>>>>> upstream events and to deal with the very few straggler users by a >>>>>>> lateness >>>>>>> configuration. >>>>>>> >>>>>>> Does that make sense? So far to me this seems very reasonable (to >>>>>>> want to keep the watermark moving and deal w/ the late events the few of >>>>>>> which actually fall out of the window using explicit lateness >>>>>>> configuration.) >>>>>>> >>>>>>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Have you looked at using >>>>>>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the >>>>>>>> downstream watermark back to the beginning of the window (presumably >>>>>>>> the >>>>>>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp >>>>>>>> using >>>>>>>> the CLICK GREEN timestamp without needing to set the allowed-lateness >>>>>>>> skew. >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon <atdi...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I've just built a pipeline in Beam and after exploring several >>>>>>>>> options for my use case, I've ended up relying on the deprecated >>>>>>>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems >>>>>>>>> to me a >>>>>>>>> quite valid use case. So I suppose this is a vote for un-deprecating >>>>>>>>> this >>>>>>>>> API (or a teachable moment in which I could be pointed to a more >>>>>>>>> suitable >>>>>>>>> non-deprecated approach.) >>>>>>>>> >>>>>>>>> I'll stick with a previously simplification of my use case: >>>>>>>>> >>>>>>>>> I get these events from my users: >>>>>>>>> LOGIN >>>>>>>>> CLICK GREEN BUTTON >>>>>>>>> LOGOUT >>>>>>>>> >>>>>>>>> I capture user session duration (logout time *minus* login time) >>>>>>>>> and I want to perform a PER DAY average (i.e., my window is on >>>>>>>>> CalendarDays) BUT where the aggregation's timestamp is the time of the >>>>>>>>> CLICK GREEN event. >>>>>>>>> >>>>>>>>> So once I calculate and emit a single user's session duration I >>>>>>>>> need to .outputWithTimestamp using the CLICK GREEN event's timestamp. >>>>>>>>> This >>>>>>>>> involves, of course, outputting with a timestamp *before* the >>>>>>>>> watermark. >>>>>>>>> >>>>>>>>> In most cases my users LOGOUT in the same day as the CLICK GREEN >>>>>>>>> BUTTON event, so even though I'm typically outputting a timestamp >>>>>>>>> before >>>>>>>>> the watermark the CalendarDay window is not yet closed and so most >>>>>>>>> user >>>>>>>>> session duration's do not affect a late aggregation for that >>>>>>>>> CalendarDay. >>>>>>>>> >>>>>>>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN >>>>>>>>> event do I have to contend with potentially late data contributing >>>>>>>>> back to >>>>>>>>> a prior CalendarDay. >>>>>>>>> >>>>>>>>> In any case, I have .withAllowedLateness to allow me to make a >>>>>>>>> call here about what I'm willing tradeoff (keeping windows open vs. >>>>>>>>> dropping data for users with overly long sessions), etc. >>>>>>>>> >>>>>>>>> This here seems to be a simple scenario (it is effectively my >>>>>>>>> real-world scenario) and the >>>>>>>>> .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem to cover it >>>>>>>>> in a >>>>>>>>> straightforward, effective way. >>>>>>>>> >>>>>>>>> However of course I don't like building production code on >>>>>>>>> deprecated capabilities -- so advice on alternatives (or perhaps a >>>>>>>>> reconsideration of this deprecation :) ) would be appreciated. >>>>>>>>> >>>>>>>>>