Just to circle back around, after the discussion on this thread I propose
modifying the proposed API as follows:

class MyDoFn extends DoFn<String, String> {
  @TimerFamily("timers") TimerSpec timers =
TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerFamily("timers") TimerMap
timers)) {
    timers.set("timer1", ...);
    timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String timerFired,
@Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ... }
}

Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
are a bit independent (though not completely so, as it does relate), so I
suggest splitting that into a separate discussion.

Reuven

On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> yes, if this change is intended to be used by end users, then
>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>> with dynamic state - I agree that this is separate problem, but because it
>> is close enough, we should know how we want to deal with that. Because
>> state and timers share some functionality (after all timers need state to
>> be fault tolerant), these API should IMO share the same logic. Whatever
>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>
>> I'd like to stop a little with the premise that users want dynamic timers
>> (that is timers whose *name* - and therefore behavior - is determined by
>> incoming data). Could this case be modeled so that the timer actually has
>> one more (implicit) state variable that actually holds collection of tuples
>> (timestamp, name)? Then the timer would be invoked at given (minimum of all
>> currently set) timestamps with respective name? The question here probably
>> is - can this have performance impact? That is to say - can any runner
>> actually do anything different from this in the sense of time complexity of
>> the algorithm?
>>
>
> Yes - you could always multiplex many timers one one. This is what some
> users do today, but it tends to be very inefficient and also complex. The
> Beam model requires runners to support dynamic timers per key (e.g. that
> how windowing is implemented - each window has a separate timer), so
> there's no reason not to expose this to users.
>
>> I'm a little afraid if actually letting users define data-driven timers
>> might not be too restrictive for some runners. Yes, runners that don't have
>> this option would probably be able to resort to the logic described above,
>> but if this work could be reasonably done for all runners, then we wouldn't
>> force runners to actually implement it. And, the API could look as if the
>> timers were actually dynamic.
>>
>> Jan
>>
>> P.S. If dynamic (and therefore any number) of timers can be actually
>> implemented using single timer, that might be interesting pattern, because
>> single timer per (window, key) has many nice properties, like it implicitly
>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>> seems to issue for multiple runners (samza, portable flink).
>>
> BEAM-7520 is simply an implementation bug. I don't think it makes sense to
> fix a bug by restricting the model.
>
>
>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>
>> Kenn:
>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>
>> Jan:
>> This is definitely not just for DSLs. I've definitely seen cases where
>> the user wants different timers based on input data, so they cannot be
>> defined statically. As a thought experiment: one stated goal of state +
>> timers was to provide the low-level tools we use to implement windowing.
>> However to implement windowing you need a dynamic set of timers, not just a
>> single one. Now most users don't need to reimplement windowing (though we
>> have had some users who had that need, when they wanted something slightly
>> different than what native Beam windowing provided), however the need for
>> dynamic timers is not unheard of.
>>
>> +1 to allowing dynamic state. However I think this is separate enough
>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>> which I think is a bit less of an issue for dynamic timers.
>>
>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>> problem. The DSL still needs a way to set and process the timers. It also
>> does not solve the problem where the timers are based on input data
>> elements, so cannot be known at pipeline construction time. However what
>> might be more important is statically defining the timer families, and a
>> DSL could do this by specifying a DoFnSignature (and something similar
>> could be done with state). Also as mentioned above, this is useful to
>> normal Beam users as well, and we shouldn't force normal users to start
>> dealing with DoFnSignatures and DoFnInvokers.
>>
>>
>>
>>
>>
>>
>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Max,
>>>
>>> wouldn't that be actually the same as
>>>
>>> class MyDoFn extends DoFn<String, String> {
>>>
>>>
>>>    @ProcessElement
>>>    public void process(
>>>        ProcessContext context) {
>>>      // "get" would register a new TimerSpec
>>>      Timer timer1 = context.getTimer("timer1");
>>>      Timer timer2 = context.getTimer("timer2");
>>>      timers.set(...);
>>>      timers.set(...);
>>>    }
>>>
>>> That is - no need to declare anything? One more concern about that - if
>>> we allow registration of timers (or even state) dynamically like that it
>>> might be harder to perform validation of pipeline upon upgrades.
>>>
>>> Jan
>>>
>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>> > The idea makes sense to me. I really like that Beam gives upfront
>>> > specs for timer and state, but it is not flexible enough for
>>> > timer-based libraries or for users which want to dynamically generate
>>> > timers.
>>> >
>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>> > timer specs from setting actual timers?
>>> >
>>> > Suggestion:
>>> >
>>> > class MyDoFn extends DoFn<String, String> {
>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>> >
>>> >   @ProcessElement
>>> >   public void process(
>>> >       @Element String e,
>>> >       @TimerMap TimerMap timers)) {
>>> >     // "get" would register a new TimerSpec
>>> >     Timer timer1 = timers.get("timer1");
>>> >     Timer timer2 = timers.get("timer2");
>>> >     timers.set(...);
>>> >     timers.set(...);
>>> >   }
>>> >
>>> >   // No args for "@OnTimer" => use generic TimerMap
>>> >   @OnTimer
>>> >   public void onTimer(
>>> >       @TimerId String timerFired,
>>> >       @Timestamp Instant timerTs,
>>> >       @TimerMap TimerMap timers) {
>>> >      // Timer firing
>>> >      ...
>>> >      // Set this timer (or another)
>>> >      Timer timer = timers.get(timerFired);
>>> >      timer.set(...);
>>> >   }
>>> > }
>>> >
>>> > What do you think?
>>> >
>>> > -Max
>>> >
>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>> >> Hi Kenn,
>>> >>
>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>> >>> This seems extremely useful.
>>> >>>
>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>> >>> suggest that the parameter annotation be something other
>>> >>> than @TimerId since that annotation is already used for a very
>>> >>> similar but different purpose; they are close enough that it is
>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>> >>> keep @TimerId in the parameter list and change the declaration
>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>> >>> clear naming than "map".
>>> >>>
>>> >>> At the portability level, this API does seem to be pretty close to a
>>> >>> noop in terms of the messages that needs to be sent over the Fn API,
>>> >>> so it makes sense to loosen the protos. By the time the Fn API is in
>>> >>> play, all of our desires to catch errors prior to execution are
>>> >>> irrelevant anyhow.
>>> >>>
>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>> >>> than this, in that they want to programmatically adjust all the
>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>> >>> another. Certainly some limited DSL use cases are addressed by this,
>>> >>> but I wouldn't take that as a primary use case for this feature.
>>> >>> Ultimately they are probably better served by being able to
>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>> >>> That would allow maximum flexibility in utilizing the portability
>>> >>> framework.
>>> >>
>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>> >> that DSLs are not bound to portability - we have to be able to
>>> >> translate even in case of "legacy" runners as well. That might
>>> >> complicate things a bit maybe.
>>> >>
>>> >> Jan
>>> >>
>>> >>>
>>> >>> Kenn
>>> >>>
>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com
>>> >>> <mailto:re...@google.com>> wrote:
>>> >>>
>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>> Beam
>>> >>>     API. I wanted to make a proposal for what this API would look
>>> >>>     like, and how to express it in the portability protos.
>>> >>>
>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>> >>>     statically declare all timers it accesses at compile time. For
>>> >>>     example:
>>> >>>
>>> >>>     class MyDoFn extends DoFn<String, String> {
>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>> >>>
>>> >>>       @ProcessElement
>>> >>>       public void process(@Element String e, @TimerId("timer1") Timer
>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>> >>>         timer1.set(...);
>>> >>>         timer2.set(...);
>>> >>>       }
>>> >>>
>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>> >>>     }
>>> >>>
>>> >>>     This requires the author of a ParDo to know the full list of
>>> >>>     timers ahead of time, which has been problematic in many cases.
>>> >>>     One example where it causes issues is for DSLs such as Euphoria
>>> or
>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>> the
>>> >>>     list of timers needed; alternatives today are quite ugly:
>>> physical
>>> >>>     code generation or creating a single timer that multiplexes all
>>> of
>>> >>>     the users logical timers. There are also cases where a ParDo
>>> needs
>>> >>>     multiple distinct timers, but the set of distinct timers is
>>> >>>     controlled by the input data, and therefore not knowable in
>>> >>>     advance. The Beam timer API has been insufficient for these use
>>> >>> cases.
>>> >>>
>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>> >>>     dynamically set named timers. It's use in the Java API would look
>>> >>>     as follows:
>>> >>>
>>> >>>     class MyDoFn extends DoFn<String, String> {
>>> >>>       @TimerId("timers") TimerSpec timers =
>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>> >>>
>>> >>>       @ProcessElement
>>> >>>       public void process(@Element String e, @TimerId("timers")
>>> >>>     TimerMap timer)) {
>>> >>>         timers.set("timer1", ...);
>>> >>>         timers.set("timer2", ...);
>>> >>>       }
>>> >>>
>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>> >>>     }
>>> >>>
>>> >>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>>> >>>     class itself allows dynamically setting multiple timers based on
>>> a
>>> >>>     String tag argument. Each TimerMap has a single callback which
>>> >>>     when called is given the id of the timer that is currently
>>> firing.
>>> >>>
>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>> >>>     required if you want to have both processing-time and event-time
>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>> >>>     namespace. i.e. if the user sets timers with the same string tag
>>> >>>     on different TimerMap objects the timers will not collide.
>>> >>>
>>> >>>     Currently the portability protos were written to mirror the Java
>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>> >>>     suggest that we instead make TimerMap the default for
>>> portability,
>>> >>>     and model the current behavior on top of timer map. If this
>>> proves
>>> >>>     problematic for some runners, we could instead introduce a new
>>> >>>     TimerSpec proto to represent TimerMap.
>>> >>>
>>> >>>     Thoughts?
>>> >>>
>>> >>>     Reuven
>>> >>>
>>>
>>

Reply via email to