On Fri, Jan 13, 2017 at 5:18 PM, Dan Sotolongo
<dsotolo...@google.com.invalid> wrote:
> The driving use case in Flume is a stateful DoFn that sets many timers,
> many of which fire immediately. Knowing the timestamp lets them
> short-circuit the timer, which improves pipeline efficiency by about 20%.
> An argument could be made against this use case as an optimization that
> should be tackled in the runner by executing expired timers eagerly, but
> there are other convenience use cases.

We should strongly avoid adding high-level APIs to enable performance
optimizations like these. Ideally we should fix runners.

I don't think the watermark itself is even really part of the Beam
model, and other runners may implement things differently.

> An important one can be summarized as "understanding pipeline semantics by
> looking at the watermark". Having access to timers and state allows quite
> complex DoFns to be written, and having access to that information makes
> debugging much easier.

This is a case for attaching watermark data (or skew) to log entries.

> Another class of use cases is treating late data differently when windows
> don't fit their use case very well. For example, they might have several
> categories of "late": divert to purgatory and drop. AllowLateness can't
> capture that.

This is one of the pitfalls: it's not safe to compare an element's
timestamp to a watermark value to determine lateness.

> I'd also argue that reading a watermark doesn't expose users to most of the
> dangers of "manually handling watermarks": there's no read-write access, no
> propagation to think about, and only one kind of watermark. It's just
> giving users direct access to something they already have indirect access
> to (via timers, PaneInfo.Timing.LATE) so they have more flexibility if
> they're stepping outside our easy-to-use, high-level API. That's a matter
> of opinion, of course.
>
> I do think it would be valuable to brainstorm the sharp edges such an API
> could expose. I wasn't able to think of any that weren't just obvious
> misuse. That's also opinion, but specifics are easier to agree on than
> generalities :)
>
> On Fri, Jan 13, 2017 at 4:30 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
>> I would like to understand usecases better too, generally manually
>> handling watermarks is notoriously difficult to get right.
>>
>> On Fri, Jan 13, 2017 at 4:24 PM, Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>> > Can you mention some use cases as to why reading the watermark has been
>> > useful?
>> >
>> > On Fri, Jan 13, 2017 at 4:21 PM, Dan Sotolongo <
>> > dsotolo...@google.com.invalid> wrote:
>> >
>> >> Hi folks,
>> >>
>> >> I work on Flume, and we've found users who have the need to read the
>> >> watermark from a DoFn. We're going to add this in the Flume API. I
>> wanted
>> >> to propose doing the same thing in the Beam API, and include some
>> >> subtleties we encountered when discussing it internally.
>> >>
>> >> First, the proposal in more detail (I'll use java in my examples... not
>> >> familiar with the Python API): the watermark can be passed into the
>> >> @ProcessElement method (and @OnTimer if desired) as an argument
>> identified
>> >> by new annotation, name TBD. It would contain an Instant (or whichever
>> time
>> >> class is preferred) specifying the watermark time that's used to fire
>> >> timers/triggers in the DoFn. The watermark read provides the guarantees:
>> >>
>> >>    1. All non-late elements with timestamp less than the watermark read
>> >>    have been durably processed by the DoFn.
>> >>    2. All timers with timestamp less than the watermark are eligible to
>> >>    fire.
>> >>
>> >> It sounds pretty straightforward, but can be complicated by a number of
>> >> things:
>> >>
>> >>    - Optimizations to the pipeline topology.
>> >>    Specifically, any transformation that splits a DoFn onto two branches
>> >>    with independent watermark propagation would violate the guarantees.
>> >> This
>> >>    isn't a concern for GBKs or DoFns with state because these require a
>> >>    shuffle to group their elements, whereas reading a watermark does
>> not.
>> >>    - Watermark holds.
>> >>    If a transform (e.g. GBK with AfterWatermark) is holding the
>> watermark,
>> >>    and a stateless DoFn tries to inspect it downstream, the
>> >>    watermark-propagation mechanism has to ensure that it operates at a
>> >>    fine-enough granularity to present distinct watermarks to each
>> >> transform.
>> >>
>> >> I hope that's helpful. If people get on board, I'd be happy to
>> contribute
>> >> to the change if someone can file the JIRA and point me to the right
>> places
>> >> in the code.
>> >>
>>

Reply via email to