Hi Jan,

Your proposal has merit, but I think using the TimerFamily specification is
more consistent with the existing API. I think that a StateFamily can also
have domains just like timers.

Luke's suggestion for the proto changes sound good.

Reuven

On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reuven,
>
> I didn't propose to restrict the model. Model can (and should have)
> multiple timers per key and even dynamic. The question was if this can be
> made efficiently by using single timer (after all, the runner will probably
> have single "timer service" so no matter what we expose on the API level,
> this will end up being multiplexed in the runner). And it might have
> additional benefits of preventing bugs. But I'm not proposing to do this
> change for existing timers, that was more a question about if we really
> must force runners to be able to implement dynamic timers or we can do it
> on the translation layer generally for all runners at once.
>
> Regarding the API - which is again independent question of how it will be
> implemented - what do we need the @TimerFamily TimerSpec declaration for? I
> see two reasons:
>
>  a) it holds the time domain
>
>  b) it declares the DoFn as being stateful
>
> Property a) looks like it can be specified when setting the timer. b)
> could be inferred from @ProcessElement (or other method). What about
> class MyDoFn extends DoFn<String, String> {
>   @ProcessElement
>   // declares @TimerContext which implies stateful DoFn
>   public void process(@Element String e, @TimerContext TimerContext timers))
> {
>     Timer timer1 = timers.get("timer1", EVENT_TIME);
>     Timer timer2 = timers.get("timer2", PROCESSING_TIME);
>     timer1.set(...);
>     timer2.set(...);
>   }
>
>   // empty name might be allowed iff the declaration contains
> @TimerContext, so that declares using dynamic timers
>   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp
> Instant timerTs, @TimerContext TimerContext timers) { ... }
> }
>
> I'm still seeking the analogy with dynamic state, because in this API,
> that might become
>
> class MyDoFn extends DoFn<String, String> {
>   @ProcessElement
>   public void process(@Element String e, @StateContext StateContext
> states)) {
>     ValueState state = states.get("myDynamicState", StateSpec...);
>     state.get(...)
>     state.set(...)
>   }
> }
>
> The point is that there seems to be no use for any declaration like
> @TimerFamily in case of dynamic state, because there is no domain. It would
> feel weird to have to declare something for dynamic timers and not have to
> do it for state.
>
> Jan
>
> On 10/29/19 6:56 AM, Reuven Lax wrote:
>
> Just to circle back around, after the discussion on this thread I propose
> modifying the proposed API as follows:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerFamily("timers") TimerSpec timers =
> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerFamily("timers") TimerMap
> timers)) {
>     timers.set("timer1", ...);
>     timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ...
> }
> }
>
> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
> are a bit independent (though not completely so, as it does relate), so I
> suggest splitting that into a separate discussion.
>
> Reuven
>
> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Reuven,
>>>
>>> yes, if this change is intended to be used by end users, then
>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>>> with dynamic state - I agree that this is separate problem, but because it
>>> is close enough, we should know how we want to deal with that. Because
>>> state and timers share some functionality (after all timers need state to
>>> be fault tolerant), these API should IMO share the same logic. Whatever
>>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>>
>>> I'd like to stop a little with the premise that users want dynamic
>>> timers (that is timers whose *name* - and therefore behavior - is
>>> determined by incoming data). Could this case be modeled so that the timer
>>> actually has one more (implicit) state variable that actually holds
>>> collection of tuples (timestamp, name)? Then the timer would be invoked at
>>> given (minimum of all currently set) timestamps with respective name? The
>>> question here probably is - can this have performance impact? That is to
>>> say - can any runner actually do anything different from this in the sense
>>> of time complexity of the algorithm?
>>>
>>
>> Yes - you could always multiplex many timers one one. This is what some
>> users do today, but it tends to be very inefficient and also complex. The
>> Beam model requires runners to support dynamic timers per key (e.g. that
>> how windowing is implemented - each window has a separate timer), so
>> there's no reason not to expose this to users.
>>
>>> I'm a little afraid if actually letting users define data-driven timers
>>> might not be too restrictive for some runners. Yes, runners that don't have
>>> this option would probably be able to resort to the logic described above,
>>> but if this work could be reasonably done for all runners, then we wouldn't
>>> force runners to actually implement it. And, the API could look as if the
>>> timers were actually dynamic.
>>>
>>> Jan
>>>
>>> P.S. If dynamic (and therefore any number) of timers can be actually
>>> implemented using single timer, that might be interesting pattern, because
>>> single timer per (window, key) has many nice properties, like it implicitly
>>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>>> seems to issue for multiple runners (samza, portable flink).
>>>
>> BEAM-7520 is simply an implementation bug. I don't think it makes sense
>> to fix a bug by restricting the model.
>>
>>
>>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>>
>>> Kenn:
>>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>>
>>> Jan:
>>> This is definitely not just for DSLs. I've definitely seen cases where
>>> the user wants different timers based on input data, so they cannot be
>>> defined statically. As a thought experiment: one stated goal of state +
>>> timers was to provide the low-level tools we use to implement windowing.
>>> However to implement windowing you need a dynamic set of timers, not just a
>>> single one. Now most users don't need to reimplement windowing (though we
>>> have had some users who had that need, when they wanted something slightly
>>> different than what native Beam windowing provided), however the need for
>>> dynamic timers is not unheard of.
>>>
>>> +1 to allowing dynamic state. However I think this is separate enough
>>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>>> which I think is a bit less of an issue for dynamic timers.
>>>
>>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>>> problem. The DSL still needs a way to set and process the timers. It also
>>> does not solve the problem where the timers are based on input data
>>> elements, so cannot be known at pipeline construction time. However what
>>> might be more important is statically defining the timer families, and a
>>> DSL could do this by specifying a DoFnSignature (and something similar
>>> could be done with state). Also as mentioned above, this is useful to
>>> normal Beam users as well, and we shouldn't force normal users to start
>>> dealing with DoFnSignatures and DoFnInvokers.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Max,
>>>>
>>>> wouldn't that be actually the same as
>>>>
>>>> class MyDoFn extends DoFn<String, String> {
>>>>
>>>>
>>>>    @ProcessElement
>>>>    public void process(
>>>>        ProcessContext context) {
>>>>      // "get" would register a new TimerSpec
>>>>      Timer timer1 = context.getTimer("timer1");
>>>>      Timer timer2 = context.getTimer("timer2");
>>>>      timers.set(...);
>>>>      timers.set(...);
>>>>    }
>>>>
>>>> That is - no need to declare anything? One more concern about that - if
>>>> we allow registration of timers (or even state) dynamically like that
>>>> it
>>>> might be harder to perform validation of pipeline upon upgrades.
>>>>
>>>> Jan
>>>>
>>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>>> > The idea makes sense to me. I really like that Beam gives upfront
>>>> > specs for timer and state, but it is not flexible enough for
>>>> > timer-based libraries or for users which want to dynamically generate
>>>> > timers.
>>>> >
>>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>>> > timer specs from setting actual timers?
>>>> >
>>>> > Suggestion:
>>>> >
>>>> > class MyDoFn extends DoFn<String, String> {
>>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>>> >
>>>> >   @ProcessElement
>>>> >   public void process(
>>>> >       @Element String e,
>>>> >       @TimerMap TimerMap timers)) {
>>>> >     // "get" would register a new TimerSpec
>>>> >     Timer timer1 = timers.get("timer1");
>>>> >     Timer timer2 = timers.get("timer2");
>>>> >     timers.set(...);
>>>> >     timers.set(...);
>>>> >   }
>>>> >
>>>> >   // No args for "@OnTimer" => use generic TimerMap
>>>> >   @OnTimer
>>>> >   public void onTimer(
>>>> >       @TimerId String timerFired,
>>>> >       @Timestamp Instant timerTs,
>>>> >       @TimerMap TimerMap timers) {
>>>> >      // Timer firing
>>>> >      ...
>>>> >      // Set this timer (or another)
>>>> >      Timer timer = timers.get(timerFired);
>>>> >      timer.set(...);
>>>> >   }
>>>> > }
>>>> >
>>>> > What do you think?
>>>> >
>>>> > -Max
>>>> >
>>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>>> >> Hi Kenn,
>>>> >>
>>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>>> >>> This seems extremely useful.
>>>> >>>
>>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>>> >>> suggest that the parameter annotation be something other
>>>> >>> than @TimerId since that annotation is already used for a very
>>>> >>> similar but different purpose; they are close enough that it is
>>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>>> >>> keep @TimerId in the parameter list and change the declaration
>>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>>> >>> clear naming than "map".
>>>> >>>
>>>> >>> At the portability level, this API does seem to be pretty close to
>>>> a
>>>> >>> noop in terms of the messages that needs to be sent over the Fn
>>>> API,
>>>> >>> so it makes sense to loosen the protos. By the time the Fn API is
>>>> in
>>>> >>> play, all of our desires to catch errors prior to execution are
>>>> >>> irrelevant anyhow.
>>>> >>>
>>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>>> >>> than this, in that they want to programmatically adjust all the
>>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>>> >>> another. Certainly some limited DSL use cases are addressed by
>>>> this,
>>>> >>> but I wouldn't take that as a primary use case for this feature.
>>>> >>> Ultimately they are probably better served by being able to
>>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>>> >>> That would allow maximum flexibility in utilizing the portability
>>>> >>> framework.
>>>> >>
>>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>>> >> that DSLs are not bound to portability - we have to be able to
>>>> >> translate even in case of "legacy" runners as well. That might
>>>> >> complicate things a bit maybe.
>>>> >>
>>>> >> Jan
>>>> >>
>>>> >>>
>>>> >>> Kenn
>>>> >>>
>>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com
>>>> >>> <mailto:re...@google.com>> wrote:
>>>> >>>
>>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>>> Beam
>>>> >>>     API. I wanted to make a proposal for what this API would look
>>>> >>>     like, and how to express it in the portability protos.
>>>> >>>
>>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>>> >>>     statically declare all timers it accesses at compile time. For
>>>> >>>     example:
>>>> >>>
>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>>> >>>
>>>> >>>       @ProcessElement
>>>> >>>       public void process(@Element String e, @TimerId("timer1")
>>>> Timer
>>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>>> >>>         timer1.set(...);
>>>> >>>         timer2.set(...);
>>>> >>>       }
>>>> >>>
>>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>>> >>>     }
>>>> >>>
>>>> >>>     This requires the author of a ParDo to know the full list of
>>>> >>>     timers ahead of time, which has been problematic in many cases.
>>>> >>>     One example where it causes issues is for DSLs such as Euphoria
>>>> or
>>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>>> the
>>>> >>>     list of timers needed; alternatives today are quite ugly:
>>>> physical
>>>> >>>     code generation or creating a single timer that multiplexes all
>>>> of
>>>> >>>     the users logical timers. There are also cases where a ParDo
>>>> needs
>>>> >>>     multiple distinct timers, but the set of distinct timers is
>>>> >>>     controlled by the input data, and therefore not knowable in
>>>> >>>     advance. The Beam timer API has been insufficient for these use
>>>> >>> cases.
>>>> >>>
>>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>>> >>>     dynamically set named timers. It's use in the Java API would
>>>> look
>>>> >>>     as follows:
>>>> >>>
>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>> >>>       @TimerId("timers") TimerSpec timers =
>>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>>> >>>
>>>> >>>       @ProcessElement
>>>> >>>       public void process(@Element String e, @TimerId("timers")
>>>> >>>     TimerMap timer)) {
>>>> >>>         timers.set("timer1", ...);
>>>> >>>         timers.set("timer2", ...);
>>>> >>>       }
>>>> >>>
>>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>>> >>>     }
>>>> >>>
>>>> >>>     There is a new TimerSpec type to specify a TimerMap. The
>>>> TimerMap
>>>> >>>     class itself allows dynamically setting multiple timers based
>>>> on a
>>>> >>>     String tag argument. Each TimerMap has a single callback which
>>>> >>>     when called is given the id of the timer that is currently
>>>> firing.
>>>> >>>
>>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>>> >>>     required if you want to have both processing-time and event-time
>>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>>> >>>     namespace. i.e. if the user sets timers with the same string tag
>>>> >>>     on different TimerMap objects the timers will not collide.
>>>> >>>
>>>> >>>     Currently the portability protos were written to mirror the Java
>>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>>> >>>     suggest that we instead make TimerMap the default for
>>>> portability,
>>>> >>>     and model the current behavior on top of timer map. If this
>>>> proves
>>>> >>>     problematic for some runners, we could instead introduce a new
>>>> >>>     TimerSpec proto to represent TimerMap.
>>>> >>>
>>>> >>>     Thoughts?
>>>> >>>
>>>> >>>     Reuven
>>>> >>>
>>>>
>>>

Reply via email to