Hi Max,

wouldn't that be actually the same as

class MyDoFn extends DoFn<String, String> {


  @ProcessElement
  public void process(
      ProcessContext context) {
    // "get" would register a new TimerSpec
    Timer timer1 = context.getTimer("timer1");
    Timer timer2 = context.getTimer("timer2");
    timers.set(...);
    timers.set(...);
  }

That is - no need to declare anything? One more concern about that - if we allow registration of timers (or even state) dynamically like that it might be harder to perform validation of pipeline upon upgrades.

Jan

On 10/22/19 4:47 PM, Maximilian Michels wrote:
The idea makes sense to me. I really like that Beam gives upfront specs for timer and state, but it is not flexible enough for timer-based libraries or for users which want to dynamically generate timers.

I'm not sure about the proposed API yet. Shouldn't we separate the timer specs from setting actual timers?

Suggestion:

class MyDoFn extends DoFn<String, String> {
  @TimerMap TimerMap timers = TimerSpecs.timerMap();

  @ProcessElement
  public void process(
      @Element String e,
      @TimerMap TimerMap timers)) {
    // "get" would register a new TimerSpec
    Timer timer1 = timers.get("timer1");
    Timer timer2 = timers.get("timer2");
    timers.set(...);
    timers.set(...);
  }

  // No args for "@OnTimer" => use generic TimerMap
  @OnTimer
  public void onTimer(
      @TimerId String timerFired,
      @Timestamp Instant timerTs,
      @TimerMap TimerMap timers) {
     // Timer firing
     ...
     // Set this timer (or another)
     Timer timer = timers.get(timerFired);
     timer.set(...);
  }
}

What do you think?

-Max

On 22.10.19 10:35, Jan Lukavský wrote:
Hi Kenn,

On 10/22/19 2:48 AM, Kenneth Knowles wrote:
This seems extremely useful.

I assume you mean `@OnTimer("timers")` in your example. I would suggest that the parameter annotation be something other than @TimerId since that annotation is already used for a very similar but different purpose; they are close enough that it is tempting to pun them, but it is clearer to keep them distinct IMO. Perhaps @TimerName or @TimerKey or some such. Alternatively, keep @TimerId in the parameter list and change the declaration to @TimerFamily("timers"). I think "family" or "group" may be more clear naming than "map".

At the portability level, this API does seem to be pretty close to a noop in terms of the messages that needs to be sent over the Fn API, so it makes sense to loosen the protos. By the time the Fn API is in play, all of our desires to catch errors prior to execution are irrelevant anyhow.

On the other hand, I think DSLs have a different & bigger problem than this, in that they want to programmatically adjust all the capabilities of a DoFn. Same goes for wrapping one DoFn in another. Certainly some limited DSL use cases are addressed by this, but I wouldn't take that as a primary use case for this feature. Ultimately they are probably better served by being able to explicitly author a DoFnInvoker and provide it to a variant of beam:transforms:ParDo where the do_fn field is a serialized DoFnInvoker. Now that I think about this, I cannot recall why we don't already ship a DoFnSignature & DoFnInvoker as the payload. That would allow maximum flexibility in utilizing the portability framework.

yes, exactly, but when DSLs are in question, we have to make sure that DSLs are not bound to portability - we have to be able to translate even in case of "legacy" runners as well. That might complicate things a bit maybe.

Jan


Kenn

On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:

    BEAM-6857 documents the need for dynamic timer support in the Beam
    API. I wanted to make a proposal for what this API would look
    like, and how to express it in the portability protos.

    Background: Today Beam (especially BeamJava) requires a ParDo to
    statically declare all timers it accesses at compile time. For
    example:

    class MyDoFn extends DoFn<String, String> {
      @TimerId("timer1") TimerSpec timer1 =
    TimerSpecs.timer(TimeDomain(EVENT_TIME));
      @TimerId("timer2") TimerSpec timer2 =
    TimerSpecs.timer(TimeDomain(PROCESSING_TIME));

      @ProcessElement
      public void process(@Element String e, @TimerId("timer1") Timer
    timer1, @TimerId("timer2") Timer timer2)) {
        timer1.set(...);
        timer2.set(...);
      }

      @OnTimer("timer1") public void onTimer1() { ... }
      @OnTimer("timer2") public void onTimer2() { ... }
    }

    This requires the author of a ParDo to know the full list of
    timers ahead of time, which has been problematic in many cases.
    One example where it causes issues is for DSLs such as Euphoria or
    Scio. DSL authors usually write ParDos to interpret the code
    written in the high-level DSL, and so don't know ahead of time the
    list of timers needed; alternatives today are quite ugly: physical
    code generation or creating a single timer that multiplexes all of
    the users logical timers. There are also cases where a ParDo needs
    multiple distinct timers, but the set of distinct timers is
    controlled by the input data, and therefore not knowable in
    advance. The Beam timer API has been insufficient for these use cases.

    I propose a new TimerMap construct, which allow a ParDo to
    dynamically set named timers. It's use in the Java API would look
    as follows:

    class MyDoFn extends DoFn<String, String> {
      @TimerId("timers") TimerSpec timers =
    TimerSpecs.timerMap(TimeDomain(EVENT_TIME));

      @ProcessElement
      public void process(@Element String e, @TimerId("timers")
    TimerMap timer)) {
        timers.set("timer1", ...);
        timers.set("timer2", ...);
      }

      @OnTimer("timer") public void onTimer(@TimerId String
    timerFired, @Timestamp Instant timerTs) { ... }
    }

    There is a new TimerSpec type to specify a TimerMap. The TimerMap
    class itself allows dynamically setting multiple timers based on a
    String tag argument. Each TimerMap has a single callback which
    when called is given the id of the timer that is currently firing.

    It is allowed to have multiple TimerMap objects in a ParDo (and
    required if you want to have both processing-time and event-time
    timers in the same ParDo). Each TimerMap is its own logical
    namespace. i.e. if the user sets timers with the same string tag
    on different TimerMap objects the timers will not collide.

    Currently the portability protos were written to mirror the Java
    API, expecting one TimerSpec per timer accessed by the ParDo. I
    suggest that we instead make TimerMap the default for portability,
    and model the current behavior on top of timer map. If this proves
    problematic for some runners, we could instead introduce a new
    TimerSpec proto to represent TimerMap.

    Thoughts?

    Reuven

Reply via email to