Can you mention some use cases as to why reading the watermark has been useful?
On Fri, Jan 13, 2017 at 4:21 PM, Dan Sotolongo < dsotolo...@google.com.invalid> wrote: > Hi folks, > > I work on Flume, and we've found users who have the need to read the > watermark from a DoFn. We're going to add this in the Flume API. I wanted > to propose doing the same thing in the Beam API, and include some > subtleties we encountered when discussing it internally. > > First, the proposal in more detail (I'll use java in my examples... not > familiar with the Python API): the watermark can be passed into the > @ProcessElement method (and @OnTimer if desired) as an argument identified > by new annotation, name TBD. It would contain an Instant (or whichever time > class is preferred) specifying the watermark time that's used to fire > timers/triggers in the DoFn. The watermark read provides the guarantees: > > 1. All non-late elements with timestamp less than the watermark read > have been durably processed by the DoFn. > 2. All timers with timestamp less than the watermark are eligible to > fire. > > It sounds pretty straightforward, but can be complicated by a number of > things: > > - Optimizations to the pipeline topology. > Specifically, any transformation that splits a DoFn onto two branches > with independent watermark propagation would violate the guarantees. > This > isn't a concern for GBKs or DoFns with state because these require a > shuffle to group their elements, whereas reading a watermark does not. > - Watermark holds. > If a transform (e.g. GBK with AfterWatermark) is holding the watermark, > and a stateless DoFn tries to inspect it downstream, the > watermark-propagation mechanism has to ensure that it operates at a > fine-enough granularity to present distinct watermarks to each > transform. > > I hope that's helpful. If people get on board, I'd be happy to contribute > to the change if someone can file the JIRA and point me to the right places > in the code. >