Reuven, thank you much for your help and the clarity here, it's very
helpful..

Per your solution #2 -- This approach makes sense, seems semantically
right, and something I'll explore when the timer.withOutputTimetstamp(t)
releases. Just for clarity, there is no other way in Beam
(mid-pipeline/post-Source) for me to affect a hold the watermark today
until this API is released, correct?

On Mon, Jan 13, 2020 at 1:22 AM Reuven Lax <re...@google.com> wrote:

> Semantically though, since you want the CalendarWindow aggregation to be
> based on login timestamps, the watermark should be tracking the login
> timestamps. The watermark is a way for the CalendarWindow to know that as
> far as the system knows, there will be no more events that fall into that
> window. You say that long sessions are holding back the watermark, but
> that's exactly because those long sessions mean that there is still data
> pending for that CalendarWindow, so it is still incomplete! The above
> techniques might appear to solve this, but do so at the expense of somewhat
> randomly causing data to be late or worse dropped.
>
> There are a couple of ways I would address this:
>
> 1. The simplest would be to allow the watermark to track the login window,
> but put a trigger on the CalendarWindow (e.g. trigger every 10 seconds).
> That way whenever the trigger fires you can update the results so far for
> that window. This means that the majority of session that are complete can
> be output without needing to wait for the long sessions, yet the window
> will remain open waiting for those long sessions to complete.
>
> 2. Another possibility is to explicitly identify those extra-long
> sessions, and handle them differently. This I think is a better solution
> than the above timestampSkew solution, because it's deterministic: you know
> exactly which sessions you are handling differently. I would do this by
> using the state+timers API to calculate the sessions, instead of the
> sessions WindowFn. When a session is overly long, then you can stop setting
> the watermark hold for the login time, essentially removing that long
> session from the watermark calculation.
>
> One possibility for how to handle the long sessions "differently" would
> still involve using withAllowedTimestampSkew. This still risks losing some
> of these (if the skew ever happens to be larger than the static value you
> set, you'll not be about to output the session). However now you know
> you're limiting the skewed output to only those specific long sessions
> you've chosen, which is much better than emitting all records with skew and
> hoping that things work out.
>
> Reuven
>
> On Sun, Jan 12, 2020 at 12:07 PM Aaron Dixon <atdi...@gmail.com> wrote:
>
>> Reuven thanks -- I understand each point although I'm trying to grapple
>> with your concerns expressed in #3; they don't seem avoidable even w/o the
>> allowedSkew feature.
>>
>> Considering your response I see a revision to my solution that omits
>> using the allowed skew configuration but as far as I can tell still has the
>> concerns from #3 (i.e., difficulty in reasoning about which events may be
>> dropped.)
>>
>> My pipeline using the skew config looks like this:
>>
>> (1) CustomSessionWindow
>> emits -> (user, login, logout) @ <logout-time>
>> (2) ParDo
>> -> re-emits same tuple but w/ *login* timestamp
>>     (requires custom allowed-skew)
>> (3) CalendarWindow
>> -> <places in window based on **event** timestamp, which is the *login*
>> timestamp>
>>
>> Instead, I can write a CustomCalendarWindow that places the tuple element
>> in the right window based on the *login* timestamp, avoiding the need for
>> the middle/skewing ParDo:
>>
>> (1) CustomSessionWindow
>> -> (user, login, logout) @ <logout-time>
>> (2) CustomCalendarWindow
>> -> <*explicitly* places element in window based on the **login**
>> timestamp>
>>
>> So the use of the ParDo was simply a way to avoid having to write a
>> custom window; it essentially ensures the CalendarWindow windows based on
>> login time.
>>
>> But I don't see how your concerns in #3 are obviated by this revision.
>> Elements going in to the calendar window may be already late...this is
>> something that any (multi-stage) Beam pipeline has to contend with, even
>> without the deprecated allowedSkew facility, no?
>>
>> In other words both of these pipelines are semantically, behaviorally
>> identical. The former just had the benefit of not requiring a custom window
>> implementation.
>>
>>
>>
>>
>>
>>
>> On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax <re...@google.com> wrote:
>>
>>> A few comments:
>>>
>>> 1. Yes, this already works on Dataflow (at Beam head). Flink support is
>>> pending at pr/10534.
>>>
>>> 2. Just to make sure where on the same page: getAllowedTimestampSkew is
>>> _not_ about outputting behind the watermark. Rather it's about outputting a
>>> timestamp that's less than the current input timestamp. If for example the
>>> watermark is 12:00 and the current input element has a timestamp of 11:00
>>> (because it's late), then  you can output an element at 11:00 with no need
>>> to set this parameter. It appears that the JavaDoc is somewhat confusing on
>>> this method.
>>>
>>> 3. The reason for this parameter is that the watermark only correctly
>>> tracks timestamps internal to the pipeline if your code doesn't make
>>> timestamps travel back in time - i.e. a ParDo taking an element with a
>>> timestamp of 12:00 and outputting another element. If you use
>>> getAllowedTimestampSkew your elements produced might not be tracked by the
>>> watermark and will show up late (even if the source element is on time).
>>> What's worse, there's a chance that the elements will be older than
>>> allowedLateness and will get dropped altogether (this can happen even if
>>> allowedTimestampSkew < maxAllowedLateness, because the input element might
>>> already be late and you'll then output an element that has an even earlier
>>> timestamp).
>>>
>>> 4. It sounds like you both want and don't want a watermark. You want the
>>> watermark to not be held up by your input (so that your aggregations keep
>>> triggering), but you then want to output old data which might prevent the
>>> watermark from working properly, and might cause data to be dropped. Have
>>> you considered instead using either triggers or timers to trigger your
>>> aggregations? That way you don't need to wait for the watermark to advance
>>> to the end of the window to trigger the aggregation, but the end-of-window
>>> aggregation will still be correct.
>>>
>>> Reuven
>>>
>>> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon <atdi...@gmail.com> wrote:
>>>
>>>> Reuven thanks for your insights so far. Just wanted to press a little
>>>> more on the deprecation question as I'm still (so far) convinced that my
>>>> use case is quite a straightforward justification (I'm looking for
>>>> confirmation or correction to my thinking here.) I've simplified my use
>>>> case a bit if it helps things:
>>>>
>>>> Use case: "For users that login on a given calendar day, what is the
>>>> average login time?"
>>>>
>>>> So I have two event types LOGIN and LOGOUT. I capture a user login
>>>> session (using custom windowing or state api, doesn't matter) and I use the
>>>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to
>>>> not be delayed.
>>>>
>>>> However per my use case requirements I must window using the LOGIN
>>>> time. So I use outputWithTimestamp plus skew configuration to this end.
>>>>
>>>> Since most of my users login and logout within the same calendar day, I
>>>> get may per-day aggregations right on time in real-time.
>>>>
>>>> Only for the few users that logout after the day that they login will I
>>>> see actual late aggregations produced in which case I can leverage Beam's
>>>> various lateness configuration levers to trade completeness for storage,
>>>> etc.
>>>>
>>>> This to me seems a *very* straightforward justification for my use of
>>>> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
>>>> facility.
>>>>
>>>> I realize there are other various solutions, now and coming soon, that
>>>> involve holding the watermark -- but any solution that requires holding the
>>>> watermark means that I have to give up getting on-time aggregations at the
>>>> very end of the calendar day (window). I would much rather (and reasonably
>>>> so?) get on-time aggregations covering the majority of my users and be
>>>> happy to refine these averages when my few latent users logout in a later
>>>> day.
>>>>
>>>> In some Beam documentation [1] there is the idea of "unobservably late
>>>> data". That is, I have specific elements that are output late (behind the
>>>> watermark) but because they are guaranteed to land *within the window* and
>>>> they are therefore promoted to be on-time. This conceptualization of things
>>>> seems very well-suited to my simple use case but definitely open to a
>>>> different way of thinking in my approach.
>>>>
>>>> My main concern is that my pipeline will be leveraging a Deprecated
>>>> facility (DoFn#getAllowedTimestampSkew) but I don't see other viable
>>>> options (within Beam) yet.
>>>>
>>>> (Hope I'm not pressing too hard on this question here. I think this use
>>>> case is interesting because it ...seems... to be a rather simple/distilled
>>>> justification for being able to output data behind the watermark
>>>> mid-stream.)
>>>>
>>>> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html
>>>>
>>>>
>>>> On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon <atdi...@gmail.com> wrote:
>>>>
>>>>> Oh nice—that will be great—will look forward to this one! Any idea of
>>>>> Dataflow will support?
>>>>>
>>>>> On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> There is now (as of last week) a way to hold back the watermark with
>>>>>> the state API (though not yet in a released version of Beam). If you set 
>>>>>> a
>>>>>> timer using withOutputTimetstamp(t), the watermark will be held to t.
>>>>>>
>>>>>> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon <atdi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Reuven thanks for your quick reply
>>>>>>>
>>>>>>>  I've tried that but the drag it puts on the watermark was too
>>>>>>> intrusive. For example, -- even if just a single user among many 
>>>>>>> decided to
>>>>>>> remain logged-in for a few days then the watermark holds everything else
>>>>>>> back.
>>>>>>>
>>>>>>> This was when using a custom session window. I've recently been
>>>>>>> using the State API to do my custom session tracking to avoid issues 
>>>>>>> with
>>>>>>> downward merging of windows (see earlier mailing list thread) ... with 
>>>>>>> the
>>>>>>> State API .. I'm not able to hold the watermark back (I think) ... but 
>>>>>>> in
>>>>>>> any case, I prefer the behavior where the watermark moves forward with 
>>>>>>> the
>>>>>>> upstream events and to deal with the very few straggler users by a 
>>>>>>> lateness
>>>>>>> configuration.
>>>>>>>
>>>>>>> Does that make sense? So far to me this seems very reasonable (to
>>>>>>> want to keep the watermark moving and deal w/ the late events the few of
>>>>>>> which actually fall out of the window using explicit lateness
>>>>>>> configuration.)
>>>>>>>
>>>>>>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Have you looked at using
>>>>>>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>>>>>>>> downstream watermark back to the beginning of the window (presumably 
>>>>>>>> the
>>>>>>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp 
>>>>>>>> using
>>>>>>>> the CLICK GREEN timestamp without needing to set the allowed-lateness 
>>>>>>>> skew.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon <atdi...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I've just built a pipeline in Beam and after exploring several
>>>>>>>>> options for my use case, I've ended up relying on the deprecated
>>>>>>>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems 
>>>>>>>>> to me a
>>>>>>>>> quite valid use case. So I suppose this is a vote for un-deprecating 
>>>>>>>>> this
>>>>>>>>> API (or a teachable moment in which I could be pointed to a more 
>>>>>>>>> suitable
>>>>>>>>> non-deprecated approach.)
>>>>>>>>>
>>>>>>>>> I'll stick with a previously simplification of my use case:
>>>>>>>>>
>>>>>>>>> I get these events from my users:
>>>>>>>>>     LOGIN
>>>>>>>>>     CLICK GREEN BUTTON
>>>>>>>>>     LOGOUT
>>>>>>>>>
>>>>>>>>> I capture user session duration (logout time *minus* login time)
>>>>>>>>> and I want to perform a PER DAY average (i.e., my window is on
>>>>>>>>> CalendarDays) BUT where the aggregation's timestamp is the time of the
>>>>>>>>> CLICK GREEN event.
>>>>>>>>>
>>>>>>>>> So once I calculate and emit a single user's session duration I
>>>>>>>>> need to .outputWithTimestamp using the CLICK GREEN event's timestamp. 
>>>>>>>>> This
>>>>>>>>> involves, of course, outputting with a timestamp *before* the 
>>>>>>>>> watermark.
>>>>>>>>>
>>>>>>>>> In most cases my users LOGOUT in the same day as the CLICK GREEN
>>>>>>>>> BUTTON event, so even though I'm typically outputting a timestamp 
>>>>>>>>> before
>>>>>>>>> the watermark the CalendarDay window is not yet closed and so most 
>>>>>>>>> user
>>>>>>>>> session duration's do not affect a late aggregation for that 
>>>>>>>>> CalendarDay.
>>>>>>>>>
>>>>>>>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN
>>>>>>>>> event do I have to contend with potentially late data contributing 
>>>>>>>>> back to
>>>>>>>>> a prior CalendarDay.
>>>>>>>>>
>>>>>>>>> In any case, I have .withAllowedLateness to allow me to make a
>>>>>>>>> call here about what I'm willing tradeoff (keeping windows open vs.
>>>>>>>>> dropping data for users with overly long sessions), etc.
>>>>>>>>>
>>>>>>>>> This here seems to be a simple scenario (it is effectively my
>>>>>>>>> real-world scenario) and the
>>>>>>>>> .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem to cover it 
>>>>>>>>> in a
>>>>>>>>> straightforward, effective way.
>>>>>>>>>
>>>>>>>>> However of course I don't like building production code on
>>>>>>>>> deprecated capabilities -- so advice on alternatives (or perhaps a
>>>>>>>>> reconsideration of this deprecation :) ) would be appreciated.
>>>>>>>>>
>>>>>>>>>

Reply via email to