The driving use case in Flume is a stateful DoFn that sets many timers,
many of which fire immediately. Knowing the timestamp lets them
short-circuit the timer, which improves pipeline efficiency by about 20%.
An argument could be made against this use case as an optimization that
should be tackled in the runner by executing expired timers eagerly, but
there are other convenience use cases.

An important one can be summarized as "understanding pipeline semantics by
looking at the watermark". Having access to timers and state allows quite
complex DoFns to be written, and having access to that information makes
debugging much easier.

Another class of use cases is treating late data differently when windows
don't fit their use case very well. For example, they might have several
categories of "late": divert to purgatory and drop. AllowLateness can't
capture that.

I'd also argue that reading a watermark doesn't expose users to most of the
dangers of "manually handling watermarks": there's no read-write access, no
propagation to think about, and only one kind of watermark. It's just
giving users direct access to something they already have indirect access
to (via timers, PaneInfo.Timing.LATE) so they have more flexibility if
they're stepping outside our easy-to-use, high-level API. That's a matter
of opinion, of course.

I do think it would be valuable to brainstorm the sharp edges such an API
could expose. I wasn't able to think of any that weren't just obvious
misuse. That's also opinion, but specifics are easier to agree on than
generalities :)

On Fri, Jan 13, 2017 at 4:30 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> I would like to understand usecases better too, generally manually
> handling watermarks is notoriously difficult to get right.
>
> On Fri, Jan 13, 2017 at 4:24 PM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
> > Can you mention some use cases as to why reading the watermark has been
> > useful?
> >
> > On Fri, Jan 13, 2017 at 4:21 PM, Dan Sotolongo <
> > dsotolo...@google.com.invalid> wrote:
> >
> >> Hi folks,
> >>
> >> I work on Flume, and we've found users who have the need to read the
> >> watermark from a DoFn. We're going to add this in the Flume API. I
> wanted
> >> to propose doing the same thing in the Beam API, and include some
> >> subtleties we encountered when discussing it internally.
> >>
> >> First, the proposal in more detail (I'll use java in my examples... not
> >> familiar with the Python API): the watermark can be passed into the
> >> @ProcessElement method (and @OnTimer if desired) as an argument
> identified
> >> by new annotation, name TBD. It would contain an Instant (or whichever
> time
> >> class is preferred) specifying the watermark time that's used to fire
> >> timers/triggers in the DoFn. The watermark read provides the guarantees:
> >>
> >>    1. All non-late elements with timestamp less than the watermark read
> >>    have been durably processed by the DoFn.
> >>    2. All timers with timestamp less than the watermark are eligible to
> >>    fire.
> >>
> >> It sounds pretty straightforward, but can be complicated by a number of
> >> things:
> >>
> >>    - Optimizations to the pipeline topology.
> >>    Specifically, any transformation that splits a DoFn onto two branches
> >>    with independent watermark propagation would violate the guarantees.
> >> This
> >>    isn't a concern for GBKs or DoFns with state because these require a
> >>    shuffle to group their elements, whereas reading a watermark does
> not.
> >>    - Watermark holds.
> >>    If a transform (e.g. GBK with AfterWatermark) is holding the
> watermark,
> >>    and a stateless DoFn tries to inspect it downstream, the
> >>    watermark-propagation mechanism has to ensure that it operates at a
> >>    fine-enough granularity to present distinct watermarks to each
> >> transform.
> >>
> >> I hope that's helpful. If people get on board, I'd be happy to
> contribute
> >> to the change if someone can file the JIRA and point me to the right
> places
> >> in the code.
> >>
>

Reply via email to