Hi folks,

I work on Flume, and we've found users who have the need to read the
watermark from a DoFn. We're going to add this in the Flume API. I wanted
to propose doing the same thing in the Beam API, and include some
subtleties we encountered when discussing it internally.

First, the proposal in more detail (I'll use java in my examples... not
familiar with the Python API): the watermark can be passed into the
@ProcessElement method (and @OnTimer if desired) as an argument identified
by new annotation, name TBD. It would contain an Instant (or whichever time
class is preferred) specifying the watermark time that's used to fire
timers/triggers in the DoFn. The watermark read provides the guarantees:

   1. All non-late elements with timestamp less than the watermark read
   have been durably processed by the DoFn.
   2. All timers with timestamp less than the watermark are eligible to
   fire.

It sounds pretty straightforward, but can be complicated by a number of
things:

   - Optimizations to the pipeline topology.
   Specifically, any transformation that splits a DoFn onto two branches
   with independent watermark propagation would violate the guarantees. This
   isn't a concern for GBKs or DoFns with state because these require a
   shuffle to group their elements, whereas reading a watermark does not.
   - Watermark holds.
   If a transform (e.g. GBK with AfterWatermark) is holding the watermark,
   and a stateless DoFn tries to inspect it downstream, the
   watermark-propagation mechanism has to ensure that it operates at a
   fine-enough granularity to present distinct watermarks to each transform.

I hope that's helpful. If people get on board, I'd be happy to contribute
to the change if someone can file the JIRA and point me to the right places
in the code.

Reply via email to