I would like to understand usecases better too, generally manually handling watermarks is notoriously difficult to get right.
On Fri, Jan 13, 2017 at 4:24 PM, Lukasz Cwik <lc...@google.com.invalid> wrote: > Can you mention some use cases as to why reading the watermark has been > useful? > > On Fri, Jan 13, 2017 at 4:21 PM, Dan Sotolongo < > dsotolo...@google.com.invalid> wrote: > >> Hi folks, >> >> I work on Flume, and we've found users who have the need to read the >> watermark from a DoFn. We're going to add this in the Flume API. I wanted >> to propose doing the same thing in the Beam API, and include some >> subtleties we encountered when discussing it internally. >> >> First, the proposal in more detail (I'll use java in my examples... not >> familiar with the Python API): the watermark can be passed into the >> @ProcessElement method (and @OnTimer if desired) as an argument identified >> by new annotation, name TBD. It would contain an Instant (or whichever time >> class is preferred) specifying the watermark time that's used to fire >> timers/triggers in the DoFn. The watermark read provides the guarantees: >> >> 1. All non-late elements with timestamp less than the watermark read >> have been durably processed by the DoFn. >> 2. All timers with timestamp less than the watermark are eligible to >> fire. >> >> It sounds pretty straightforward, but can be complicated by a number of >> things: >> >> - Optimizations to the pipeline topology. >> Specifically, any transformation that splits a DoFn onto two branches >> with independent watermark propagation would violate the guarantees. >> This >> isn't a concern for GBKs or DoFns with state because these require a >> shuffle to group their elements, whereas reading a watermark does not. >> - Watermark holds. >> If a transform (e.g. GBK with AfterWatermark) is holding the watermark, >> and a stateless DoFn tries to inspect it downstream, the >> watermark-propagation mechanism has to ensure that it operates at a >> fine-enough granularity to present distinct watermarks to each >> transform. >> >> I hope that's helpful. If people get on board, I'd be happy to contribute >> to the change if someone can file the JIRA and point me to the right places >> in the code. >>