Hi folks, I work on Flume, and we've found users who have the need to read the watermark from a DoFn. We're going to add this in the Flume API. I wanted to propose doing the same thing in the Beam API, and include some subtleties we encountered when discussing it internally.
First, the proposal in more detail (I'll use java in my examples... not familiar with the Python API): the watermark can be passed into the @ProcessElement method (and @OnTimer if desired) as an argument identified by new annotation, name TBD. It would contain an Instant (or whichever time class is preferred) specifying the watermark time that's used to fire timers/triggers in the DoFn. The watermark read provides the guarantees: 1. All non-late elements with timestamp less than the watermark read have been durably processed by the DoFn. 2. All timers with timestamp less than the watermark are eligible to fire. It sounds pretty straightforward, but can be complicated by a number of things: - Optimizations to the pipeline topology. Specifically, any transformation that splits a DoFn onto two branches with independent watermark propagation would violate the guarantees. This isn't a concern for GBKs or DoFns with state because these require a shuffle to group their elements, whereas reading a watermark does not. - Watermark holds. If a transform (e.g. GBK with AfterWatermark) is holding the watermark, and a stateless DoFn tries to inspect it downstream, the watermark-propagation mechanism has to ensure that it operates at a fine-enough granularity to present distinct watermarks to each transform. I hope that's helpful. If people get on board, I'd be happy to contribute to the change if someone can file the JIRA and point me to the right places in the code.