This seems extremely useful.

I assume you mean `@OnTimer("timers")` in your example. I would suggest
that the parameter annotation be something other than @TimerId since that
annotation is already used for a very similar but different purpose; they
are close enough that it is tempting to pun them, but it is clearer to keep
them distinct IMO. Perhaps @TimerName or @TimerKey or some such.
Alternatively, keep @TimerId in the parameter list and change the
declaration to @TimerFamily("timers"). I think "family" or "group" may be
more clear naming than "map".

At the portability level, this API does seem to be pretty close to a noop
in terms of the messages that needs to be sent over the Fn API, so it makes
sense to loosen the protos. By the time the Fn API is in play, all of our
desires to catch errors prior to execution are irrelevant anyhow.

On the other hand, I think DSLs have a different & bigger problem than
this, in that they want to programmatically adjust all the capabilities of
a DoFn. Same goes for wrapping one DoFn in another. Certainly some limited
DSL use cases are addressed by this, but I wouldn't take that as a primary
use case for this feature. Ultimately they are probably better served by
being able to explicitly author a DoFnInvoker and provide it to a variant
of beam:transforms:ParDo where the do_fn field is a serialized DoFnInvoker.
Now that I think about this, I cannot recall why we don't already ship a
DoFnSignature & DoFnInvoker as the payload. That would allow maximum
flexibility in utilizing the portability framework.

Kenn

On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com> wrote:

> BEAM-6857 documents the need for dynamic timer support in the Beam API. I
> wanted to make a proposal for what this API would look like, and how to
> express it in the portability protos.
>
> Background: Today Beam (especially BeamJava) requires a ParDo to
> statically declare all timers it accesses at compile time. For example:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerId("timer1") TimerSpec timer1 =
> TimerSpecs.timer(TimeDomain(EVENT_TIME));
>   @TimerId("timer2") TimerSpec timer2 =
> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timer1") Timer
> timer1, @TimerId("timer2") Timer timer2)) {
>     timer1.set(...);
>     timer2.set(...);
>   }
>
>   @OnTimer("timer1") public void onTimer1() { ... }
>   @OnTimer("timer2") public void onTimer2() { ... }
> }
>
> This requires the author of a ParDo to know the full list of timers ahead
> of time, which has been problematic in many cases. One example where it
> causes issues is for DSLs such as Euphoria or Scio. DSL authors usually
> write ParDos to interpret the code written in the high-level DSL, and so
> don't know ahead of time the list of timers needed; alternatives today are
> quite ugly: physical code generation or creating a single timer that
> multiplexes all of the users logical timers. There are also cases where a
> ParDo needs multiple distinct timers, but the set of distinct timers is
> controlled by the input data, and therefore not knowable in advance. The
> Beam timer API has been insufficient for these use cases.
>
> I propose a new TimerMap construct, which allow a ParDo to dynamically set
> named timers. It's use in the Java API would look as follows:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerId("timers") TimerSpec timers =
> TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timers") TimerMap
> timer)) {
>     timers.set("timer1", ...);
>     timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs) { ... }
> }
>
> There is a new TimerSpec type to specify a TimerMap. The TimerMap class
> itself allows dynamically setting multiple timers based on a String tag
> argument. Each TimerMap has a single callback which when called is given
> the id of the timer that is currently firing.
>
> It is allowed to have multiple TimerMap objects in a ParDo (and required
> if you want to have both processing-time and event-time timers in the same
> ParDo). Each TimerMap is its own logical namespace. i.e. if the user sets
> timers with the same string tag on different TimerMap objects the timers
> will not collide.
>
> Currently the portability protos were written to mirror the Java API,
> expecting one TimerSpec per timer accessed by the ParDo. I suggest that we
> instead make TimerMap the default for portability, and model the current
> behavior on top of timer map. If this proves problematic for some runners,
> we could instead introduce a new TimerSpec proto to represent TimerMap.
>
> Thoughts?
>
> Reuven
>

Reply via email to