The driving use case in Flume is a stateful DoFn that sets many timers, many of which fire immediately. Knowing the timestamp lets them short-circuit the timer, which improves pipeline efficiency by about 20%. An argument could be made against this use case as an optimization that should be tackled in the runner by executing expired timers eagerly, but there are other convenience use cases.
An important one can be summarized as "understanding pipeline semantics by looking at the watermark". Having access to timers and state allows quite complex DoFns to be written, and having access to that information makes debugging much easier. Another class of use cases is treating late data differently when windows don't fit their use case very well. For example, they might have several categories of "late": divert to purgatory and drop. AllowLateness can't capture that. I'd also argue that reading a watermark doesn't expose users to most of the dangers of "manually handling watermarks": there's no read-write access, no propagation to think about, and only one kind of watermark. It's just giving users direct access to something they already have indirect access to (via timers, PaneInfo.Timing.LATE) so they have more flexibility if they're stepping outside our easy-to-use, high-level API. That's a matter of opinion, of course. I do think it would be valuable to brainstorm the sharp edges such an API could expose. I wasn't able to think of any that weren't just obvious misuse. That's also opinion, but specifics are easier to agree on than generalities :) On Fri, Jan 13, 2017 at 4:30 PM, Robert Bradshaw < rober...@google.com.invalid> wrote: > I would like to understand usecases better too, generally manually > handling watermarks is notoriously difficult to get right. > > On Fri, Jan 13, 2017 at 4:24 PM, Lukasz Cwik <lc...@google.com.invalid> > wrote: > > Can you mention some use cases as to why reading the watermark has been > > useful? > > > > On Fri, Jan 13, 2017 at 4:21 PM, Dan Sotolongo < > > dsotolo...@google.com.invalid> wrote: > > > >> Hi folks, > >> > >> I work on Flume, and we've found users who have the need to read the > >> watermark from a DoFn. We're going to add this in the Flume API. I > wanted > >> to propose doing the same thing in the Beam API, and include some > >> subtleties we encountered when discussing it internally. > >> > >> First, the proposal in more detail (I'll use java in my examples... not > >> familiar with the Python API): the watermark can be passed into the > >> @ProcessElement method (and @OnTimer if desired) as an argument > identified > >> by new annotation, name TBD. It would contain an Instant (or whichever > time > >> class is preferred) specifying the watermark time that's used to fire > >> timers/triggers in the DoFn. The watermark read provides the guarantees: > >> > >> 1. All non-late elements with timestamp less than the watermark read > >> have been durably processed by the DoFn. > >> 2. All timers with timestamp less than the watermark are eligible to > >> fire. > >> > >> It sounds pretty straightforward, but can be complicated by a number of > >> things: > >> > >> - Optimizations to the pipeline topology. > >> Specifically, any transformation that splits a DoFn onto two branches > >> with independent watermark propagation would violate the guarantees. > >> This > >> isn't a concern for GBKs or DoFns with state because these require a > >> shuffle to group their elements, whereas reading a watermark does > not. > >> - Watermark holds. > >> If a transform (e.g. GBK with AfterWatermark) is holding the > watermark, > >> and a stateless DoFn tries to inspect it downstream, the > >> watermark-propagation mechanism has to ensure that it operates at a > >> fine-enough granularity to present distinct watermarks to each > >> transform. > >> > >> I hope that's helpful. If people get on board, I'd be happy to > contribute > >> to the change if someone can file the JIRA and point me to the right > places > >> in the code. > >> >