Hi Jan, Your proposal has merit, but I think using the TimerFamily specification is more consistent with the existing API. I think that a StateFamily can also have domains just like timers.
Luke's suggestion for the proto changes sound good. Reuven On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi Reuven, > > I didn't propose to restrict the model. Model can (and should have) > multiple timers per key and even dynamic. The question was if this can be > made efficiently by using single timer (after all, the runner will probably > have single "timer service" so no matter what we expose on the API level, > this will end up being multiplexed in the runner). And it might have > additional benefits of preventing bugs. But I'm not proposing to do this > change for existing timers, that was more a question about if we really > must force runners to be able to implement dynamic timers or we can do it > on the translation layer generally for all runners at once. > > Regarding the API - which is again independent question of how it will be > implemented - what do we need the @TimerFamily TimerSpec declaration for? I > see two reasons: > > a) it holds the time domain > > b) it declares the DoFn as being stateful > > Property a) looks like it can be specified when setting the timer. b) > could be inferred from @ProcessElement (or other method). What about > class MyDoFn extends DoFn<String, String> { > @ProcessElement > // declares @TimerContext which implies stateful DoFn > public void process(@Element String e, @TimerContext TimerContext timers)) > { > Timer timer1 = timers.get("timer1", EVENT_TIME); > Timer timer2 = timers.get("timer2", PROCESSING_TIME); > timer1.set(...); > timer2.set(...); > } > > // empty name might be allowed iff the declaration contains > @TimerContext, so that declares using dynamic timers > @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp > Instant timerTs, @TimerContext TimerContext timers) { ... } > } > > I'm still seeking the analogy with dynamic state, because in this API, > that might become > > class MyDoFn extends DoFn<String, String> { > @ProcessElement > public void process(@Element String e, @StateContext StateContext > states)) { > ValueState state = states.get("myDynamicState", StateSpec...); > state.get(...) > state.set(...) > } > } > > The point is that there seems to be no use for any declaration like > @TimerFamily in case of dynamic state, because there is no domain. It would > feel weird to have to declare something for dynamic timers and not have to > do it for state. > > Jan > > On 10/29/19 6:56 AM, Reuven Lax wrote: > > Just to circle back around, after the discussion on this thread I propose > modifying the proposed API as follows: > > class MyDoFn extends DoFn<String, String> { > @TimerFamily("timers") TimerSpec timers = > TimerSpecs.timerFamily(TimeDomain(EVENT_TIME)); > > @ProcessElement > public void process(@Element String e, @TimerFamily("timers") TimerMap > timers)) { > timers.set("timer1", ...); > timers.set("timer2", ...); > } > > @OnTimer("timer") public void onTimer(@TimerId String timerFired, > @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ... > } > } > > Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors > are a bit independent (though not completely so, as it does relate), so I > suggest splitting that into a separate discussion. > > Reuven > > On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote: > >> >> >> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Reuven, >>> >>> yes, if this change is intended to be used by end users, then >>> DoFnSignatures cannot be used, agree on that. Regarding the relationship >>> with dynamic state - I agree that this is separate problem, but because it >>> is close enough, we should know how we want to deal with that. Because >>> state and timers share some functionality (after all timers need state to >>> be fault tolerant), these API should IMO share the same logic. Whatever >>> solution chosen to expose dynamic timers, it should extend to dynamic state. >>> >>> I'd like to stop a little with the premise that users want dynamic >>> timers (that is timers whose *name* - and therefore behavior - is >>> determined by incoming data). Could this case be modeled so that the timer >>> actually has one more (implicit) state variable that actually holds >>> collection of tuples (timestamp, name)? Then the timer would be invoked at >>> given (minimum of all currently set) timestamps with respective name? The >>> question here probably is - can this have performance impact? That is to >>> say - can any runner actually do anything different from this in the sense >>> of time complexity of the algorithm? >>> >> >> Yes - you could always multiplex many timers one one. This is what some >> users do today, but it tends to be very inefficient and also complex. The >> Beam model requires runners to support dynamic timers per key (e.g. that >> how windowing is implemented - each window has a separate timer), so >> there's no reason not to expose this to users. >> >>> I'm a little afraid if actually letting users define data-driven timers >>> might not be too restrictive for some runners. Yes, runners that don't have >>> this option would probably be able to resort to the logic described above, >>> but if this work could be reasonably done for all runners, then we wouldn't >>> force runners to actually implement it. And, the API could look as if the >>> timers were actually dynamic. >>> >>> Jan >>> >>> P.S. If dynamic (and therefore any number) of timers can be actually >>> implemented using single timer, that might be interesting pattern, because >>> single timer per (window, key) has many nice properties, like it implicitly >>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which >>> seems to issue for multiple runners (samza, portable flink). >>> >> BEAM-7520 is simply an implementation bug. I don't think it makes sense >> to fix a bug by restricting the model. >> >> >>> On 10/22/19 6:52 PM, Reuven Lax wrote: >>> >>> Kenn: >>> +1 to using TimerFamily instead of TimerId and TimerMap. >>> >>> Jan: >>> This is definitely not just for DSLs. I've definitely seen cases where >>> the user wants different timers based on input data, so they cannot be >>> defined statically. As a thought experiment: one stated goal of state + >>> timers was to provide the low-level tools we use to implement windowing. >>> However to implement windowing you need a dynamic set of timers, not just a >>> single one. Now most users don't need to reimplement windowing (though we >>> have had some users who had that need, when they wanted something slightly >>> different than what native Beam windowing provided), however the need for >>> dynamic timers is not unheard of. >>> >>> +1 to allowing dynamic state. However I think this is separate enough >>> from timers that it doesn't need to be coupled in this discussion. Dynamic >>> state also raises the wrinkle of pipeline compatibility (as you mentioned), >>> which I think is a bit less of an issue for dynamic timers. >>> >>> Allowing a DSL to specify a DoFnSignature does not quite solve this >>> problem. The DSL still needs a way to set and process the timers. It also >>> does not solve the problem where the timers are based on input data >>> elements, so cannot be known at pipeline construction time. However what >>> might be more important is statically defining the timer families, and a >>> DSL could do this by specifying a DoFnSignature (and something similar >>> could be done with state). Also as mentioned above, this is useful to >>> normal Beam users as well, and we shouldn't force normal users to start >>> dealing with DoFnSignatures and DoFnInvokers. >>> >>> >>> >>> >>> >>> >>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Max, >>>> >>>> wouldn't that be actually the same as >>>> >>>> class MyDoFn extends DoFn<String, String> { >>>> >>>> >>>> @ProcessElement >>>> public void process( >>>> ProcessContext context) { >>>> // "get" would register a new TimerSpec >>>> Timer timer1 = context.getTimer("timer1"); >>>> Timer timer2 = context.getTimer("timer2"); >>>> timers.set(...); >>>> timers.set(...); >>>> } >>>> >>>> That is - no need to declare anything? One more concern about that - if >>>> we allow registration of timers (or even state) dynamically like that >>>> it >>>> might be harder to perform validation of pipeline upon upgrades. >>>> >>>> Jan >>>> >>>> On 10/22/19 4:47 PM, Maximilian Michels wrote: >>>> > The idea makes sense to me. I really like that Beam gives upfront >>>> > specs for timer and state, but it is not flexible enough for >>>> > timer-based libraries or for users which want to dynamically generate >>>> > timers. >>>> > >>>> > I'm not sure about the proposed API yet. Shouldn't we separate the >>>> > timer specs from setting actual timers? >>>> > >>>> > Suggestion: >>>> > >>>> > class MyDoFn extends DoFn<String, String> { >>>> > @TimerMap TimerMap timers = TimerSpecs.timerMap(); >>>> > >>>> > @ProcessElement >>>> > public void process( >>>> > @Element String e, >>>> > @TimerMap TimerMap timers)) { >>>> > // "get" would register a new TimerSpec >>>> > Timer timer1 = timers.get("timer1"); >>>> > Timer timer2 = timers.get("timer2"); >>>> > timers.set(...); >>>> > timers.set(...); >>>> > } >>>> > >>>> > // No args for "@OnTimer" => use generic TimerMap >>>> > @OnTimer >>>> > public void onTimer( >>>> > @TimerId String timerFired, >>>> > @Timestamp Instant timerTs, >>>> > @TimerMap TimerMap timers) { >>>> > // Timer firing >>>> > ... >>>> > // Set this timer (or another) >>>> > Timer timer = timers.get(timerFired); >>>> > timer.set(...); >>>> > } >>>> > } >>>> > >>>> > What do you think? >>>> > >>>> > -Max >>>> > >>>> > On 22.10.19 10:35, Jan Lukavský wrote: >>>> >> Hi Kenn, >>>> >> >>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote: >>>> >>> This seems extremely useful. >>>> >>> >>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would >>>> >>> suggest that the parameter annotation be something other >>>> >>> than @TimerId since that annotation is already used for a very >>>> >>> similar but different purpose; they are close enough that it is >>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO. >>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively, >>>> >>> keep @TimerId in the parameter list and change the declaration >>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more >>>> >>> clear naming than "map". >>>> >>> >>>> >>> At the portability level, this API does seem to be pretty close to >>>> a >>>> >>> noop in terms of the messages that needs to be sent over the Fn >>>> API, >>>> >>> so it makes sense to loosen the protos. By the time the Fn API is >>>> in >>>> >>> play, all of our desires to catch errors prior to execution are >>>> >>> irrelevant anyhow. >>>> >>> >>>> >>> On the other hand, I think DSLs have a different & bigger problem >>>> >>> than this, in that they want to programmatically adjust all the >>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in >>>> >>> another. Certainly some limited DSL use cases are addressed by >>>> this, >>>> >>> but I wouldn't take that as a primary use case for this feature. >>>> >>> Ultimately they are probably better served by being able to >>>> >>> explicitly author a DoFnInvoker and provide it to a variant of >>>> >>> beam:transforms:ParDo where the do_fn field is a serialized >>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we >>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload. >>>> >>> That would allow maximum flexibility in utilizing the portability >>>> >>> framework. >>>> >> >>>> >> yes, exactly, but when DSLs are in question, we have to make sure >>>> >> that DSLs are not bound to portability - we have to be able to >>>> >> translate even in case of "legacy" runners as well. That might >>>> >> complicate things a bit maybe. >>>> >> >>>> >> Jan >>>> >> >>>> >>> >>>> >>> Kenn >>>> >>> >>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com >>>> >>> <mailto:re...@google.com>> wrote: >>>> >>> >>>> >>> BEAM-6857 documents the need for dynamic timer support in the >>>> Beam >>>> >>> API. I wanted to make a proposal for what this API would look >>>> >>> like, and how to express it in the portability protos. >>>> >>> >>>> >>> Background: Today Beam (especially BeamJava) requires a ParDo to >>>> >>> statically declare all timers it accesses at compile time. For >>>> >>> example: >>>> >>> >>>> >>> class MyDoFn extends DoFn<String, String> { >>>> >>> @TimerId("timer1") TimerSpec timer1 = >>>> >>> TimerSpecs.timer(TimeDomain(EVENT_TIME)); >>>> >>> @TimerId("timer2") TimerSpec timer2 = >>>> >>> TimerSpecs.timer(TimeDomain(PROCESSING_TIME)); >>>> >>> >>>> >>> @ProcessElement >>>> >>> public void process(@Element String e, @TimerId("timer1") >>>> Timer >>>> >>> timer1, @TimerId("timer2") Timer timer2)) { >>>> >>> timer1.set(...); >>>> >>> timer2.set(...); >>>> >>> } >>>> >>> >>>> >>> @OnTimer("timer1") public void onTimer1() { ... } >>>> >>> @OnTimer("timer2") public void onTimer2() { ... } >>>> >>> } >>>> >>> >>>> >>> This requires the author of a ParDo to know the full list of >>>> >>> timers ahead of time, which has been problematic in many cases. >>>> >>> One example where it causes issues is for DSLs such as Euphoria >>>> or >>>> >>> Scio. DSL authors usually write ParDos to interpret the code >>>> >>> written in the high-level DSL, and so don't know ahead of time >>>> the >>>> >>> list of timers needed; alternatives today are quite ugly: >>>> physical >>>> >>> code generation or creating a single timer that multiplexes all >>>> of >>>> >>> the users logical timers. There are also cases where a ParDo >>>> needs >>>> >>> multiple distinct timers, but the set of distinct timers is >>>> >>> controlled by the input data, and therefore not knowable in >>>> >>> advance. The Beam timer API has been insufficient for these use >>>> >>> cases. >>>> >>> >>>> >>> I propose a new TimerMap construct, which allow a ParDo to >>>> >>> dynamically set named timers. It's use in the Java API would >>>> look >>>> >>> as follows: >>>> >>> >>>> >>> class MyDoFn extends DoFn<String, String> { >>>> >>> @TimerId("timers") TimerSpec timers = >>>> >>> TimerSpecs.timerMap(TimeDomain(EVENT_TIME)); >>>> >>> >>>> >>> @ProcessElement >>>> >>> public void process(@Element String e, @TimerId("timers") >>>> >>> TimerMap timer)) { >>>> >>> timers.set("timer1", ...); >>>> >>> timers.set("timer2", ...); >>>> >>> } >>>> >>> >>>> >>> @OnTimer("timer") public void onTimer(@TimerId String >>>> >>> timerFired, @Timestamp Instant timerTs) { ... } >>>> >>> } >>>> >>> >>>> >>> There is a new TimerSpec type to specify a TimerMap. The >>>> TimerMap >>>> >>> class itself allows dynamically setting multiple timers based >>>> on a >>>> >>> String tag argument. Each TimerMap has a single callback which >>>> >>> when called is given the id of the timer that is currently >>>> firing. >>>> >>> >>>> >>> It is allowed to have multiple TimerMap objects in a ParDo (and >>>> >>> required if you want to have both processing-time and event-time >>>> >>> timers in the same ParDo). Each TimerMap is its own logical >>>> >>> namespace. i.e. if the user sets timers with the same string tag >>>> >>> on different TimerMap objects the timers will not collide. >>>> >>> >>>> >>> Currently the portability protos were written to mirror the Java >>>> >>> API, expecting one TimerSpec per timer accessed by the ParDo. I >>>> >>> suggest that we instead make TimerMap the default for >>>> portability, >>>> >>> and model the current behavior on top of timer map. If this >>>> proves >>>> >>> problematic for some runners, we could instead introduce a new >>>> >>> TimerSpec proto to represent TimerMap. >>>> >>> >>>> >>> Thoughts? >>>> >>> >>>> >>> Reuven >>>> >>> >>>> >>>