On Fri, Jan 13, 2017 at 5:18 PM, Dan Sotolongo <dsotolo...@google.com.invalid> wrote: > The driving use case in Flume is a stateful DoFn that sets many timers, > many of which fire immediately. Knowing the timestamp lets them > short-circuit the timer, which improves pipeline efficiency by about 20%. > An argument could be made against this use case as an optimization that > should be tackled in the runner by executing expired timers eagerly, but > there are other convenience use cases.
We should strongly avoid adding high-level APIs to enable performance optimizations like these. Ideally we should fix runners. I don't think the watermark itself is even really part of the Beam model, and other runners may implement things differently. > An important one can be summarized as "understanding pipeline semantics by > looking at the watermark". Having access to timers and state allows quite > complex DoFns to be written, and having access to that information makes > debugging much easier. This is a case for attaching watermark data (or skew) to log entries. > Another class of use cases is treating late data differently when windows > don't fit their use case very well. For example, they might have several > categories of "late": divert to purgatory and drop. AllowLateness can't > capture that. This is one of the pitfalls: it's not safe to compare an element's timestamp to a watermark value to determine lateness. > I'd also argue that reading a watermark doesn't expose users to most of the > dangers of "manually handling watermarks": there's no read-write access, no > propagation to think about, and only one kind of watermark. It's just > giving users direct access to something they already have indirect access > to (via timers, PaneInfo.Timing.LATE) so they have more flexibility if > they're stepping outside our easy-to-use, high-level API. That's a matter > of opinion, of course. > > I do think it would be valuable to brainstorm the sharp edges such an API > could expose. I wasn't able to think of any that weren't just obvious > misuse. That's also opinion, but specifics are easier to agree on than > generalities :) > > On Fri, Jan 13, 2017 at 4:30 PM, Robert Bradshaw < > rober...@google.com.invalid> wrote: > >> I would like to understand usecases better too, generally manually >> handling watermarks is notoriously difficult to get right. >> >> On Fri, Jan 13, 2017 at 4:24 PM, Lukasz Cwik <lc...@google.com.invalid> >> wrote: >> > Can you mention some use cases as to why reading the watermark has been >> > useful? >> > >> > On Fri, Jan 13, 2017 at 4:21 PM, Dan Sotolongo < >> > dsotolo...@google.com.invalid> wrote: >> > >> >> Hi folks, >> >> >> >> I work on Flume, and we've found users who have the need to read the >> >> watermark from a DoFn. We're going to add this in the Flume API. I >> wanted >> >> to propose doing the same thing in the Beam API, and include some >> >> subtleties we encountered when discussing it internally. >> >> >> >> First, the proposal in more detail (I'll use java in my examples... not >> >> familiar with the Python API): the watermark can be passed into the >> >> @ProcessElement method (and @OnTimer if desired) as an argument >> identified >> >> by new annotation, name TBD. It would contain an Instant (or whichever >> time >> >> class is preferred) specifying the watermark time that's used to fire >> >> timers/triggers in the DoFn. The watermark read provides the guarantees: >> >> >> >> 1. All non-late elements with timestamp less than the watermark read >> >> have been durably processed by the DoFn. >> >> 2. All timers with timestamp less than the watermark are eligible to >> >> fire. >> >> >> >> It sounds pretty straightforward, but can be complicated by a number of >> >> things: >> >> >> >> - Optimizations to the pipeline topology. >> >> Specifically, any transformation that splits a DoFn onto two branches >> >> with independent watermark propagation would violate the guarantees. >> >> This >> >> isn't a concern for GBKs or DoFns with state because these require a >> >> shuffle to group their elements, whereas reading a watermark does >> not. >> >> - Watermark holds. >> >> If a transform (e.g. GBK with AfterWatermark) is holding the >> watermark, >> >> and a stateless DoFn tries to inspect it downstream, the >> >> watermark-propagation mechanism has to ensure that it operates at a >> >> fine-enough granularity to present distinct watermarks to each >> >> transform. >> >> >> >> I hope that's helpful. If people get on board, I'd be happy to >> contribute >> >> to the change if someone can file the JIRA and point me to the right >> places >> >> in the code. >> >> >>