Just to circle back around, after the discussion on this thread I propose modifying the proposed API as follows:
class MyDoFn extends DoFn<String, String> { @TimerFamily("timers") TimerSpec timers = TimerSpecs.timerFamily(TimeDomain(EVENT_TIME)); @ProcessElement public void process(@Element String e, @TimerFamily("timers") TimerMap timers)) { timers.set("timer1", ...); timers.set("timer2", ...); } @OnTimer("timer") public void onTimer(@TimerId String timerFired, @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ... } } Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors are a bit independent (though not completely so, as it does relate), so I suggest splitting that into a separate discussion. Reuven On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote: > > > On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Reuven, >> >> yes, if this change is intended to be used by end users, then >> DoFnSignatures cannot be used, agree on that. Regarding the relationship >> with dynamic state - I agree that this is separate problem, but because it >> is close enough, we should know how we want to deal with that. Because >> state and timers share some functionality (after all timers need state to >> be fault tolerant), these API should IMO share the same logic. Whatever >> solution chosen to expose dynamic timers, it should extend to dynamic state. >> >> I'd like to stop a little with the premise that users want dynamic timers >> (that is timers whose *name* - and therefore behavior - is determined by >> incoming data). Could this case be modeled so that the timer actually has >> one more (implicit) state variable that actually holds collection of tuples >> (timestamp, name)? Then the timer would be invoked at given (minimum of all >> currently set) timestamps with respective name? The question here probably >> is - can this have performance impact? That is to say - can any runner >> actually do anything different from this in the sense of time complexity of >> the algorithm? >> > > Yes - you could always multiplex many timers one one. This is what some > users do today, but it tends to be very inefficient and also complex. The > Beam model requires runners to support dynamic timers per key (e.g. that > how windowing is implemented - each window has a separate timer), so > there's no reason not to expose this to users. > >> I'm a little afraid if actually letting users define data-driven timers >> might not be too restrictive for some runners. Yes, runners that don't have >> this option would probably be able to resort to the logic described above, >> but if this work could be reasonably done for all runners, then we wouldn't >> force runners to actually implement it. And, the API could look as if the >> timers were actually dynamic. >> >> Jan >> >> P.S. If dynamic (and therefore any number) of timers can be actually >> implemented using single timer, that might be interesting pattern, because >> single timer per (window, key) has many nice properties, like it implicitly >> avoids situation where timer invocation is not ordered ([BEAM-7520]), which >> seems to issue for multiple runners (samza, portable flink). >> > BEAM-7520 is simply an implementation bug. I don't think it makes sense to > fix a bug by restricting the model. > > >> On 10/22/19 6:52 PM, Reuven Lax wrote: >> >> Kenn: >> +1 to using TimerFamily instead of TimerId and TimerMap. >> >> Jan: >> This is definitely not just for DSLs. I've definitely seen cases where >> the user wants different timers based on input data, so they cannot be >> defined statically. As a thought experiment: one stated goal of state + >> timers was to provide the low-level tools we use to implement windowing. >> However to implement windowing you need a dynamic set of timers, not just a >> single one. Now most users don't need to reimplement windowing (though we >> have had some users who had that need, when they wanted something slightly >> different than what native Beam windowing provided), however the need for >> dynamic timers is not unheard of. >> >> +1 to allowing dynamic state. However I think this is separate enough >> from timers that it doesn't need to be coupled in this discussion. Dynamic >> state also raises the wrinkle of pipeline compatibility (as you mentioned), >> which I think is a bit less of an issue for dynamic timers. >> >> Allowing a DSL to specify a DoFnSignature does not quite solve this >> problem. The DSL still needs a way to set and process the timers. It also >> does not solve the problem where the timers are based on input data >> elements, so cannot be known at pipeline construction time. However what >> might be more important is statically defining the timer families, and a >> DSL could do this by specifying a DoFnSignature (and something similar >> could be done with state). Also as mentioned above, this is useful to >> normal Beam users as well, and we shouldn't force normal users to start >> dealing with DoFnSignatures and DoFnInvokers. >> >> >> >> >> >> >> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Max, >>> >>> wouldn't that be actually the same as >>> >>> class MyDoFn extends DoFn<String, String> { >>> >>> >>> @ProcessElement >>> public void process( >>> ProcessContext context) { >>> // "get" would register a new TimerSpec >>> Timer timer1 = context.getTimer("timer1"); >>> Timer timer2 = context.getTimer("timer2"); >>> timers.set(...); >>> timers.set(...); >>> } >>> >>> That is - no need to declare anything? One more concern about that - if >>> we allow registration of timers (or even state) dynamically like that it >>> might be harder to perform validation of pipeline upon upgrades. >>> >>> Jan >>> >>> On 10/22/19 4:47 PM, Maximilian Michels wrote: >>> > The idea makes sense to me. I really like that Beam gives upfront >>> > specs for timer and state, but it is not flexible enough for >>> > timer-based libraries or for users which want to dynamically generate >>> > timers. >>> > >>> > I'm not sure about the proposed API yet. Shouldn't we separate the >>> > timer specs from setting actual timers? >>> > >>> > Suggestion: >>> > >>> > class MyDoFn extends DoFn<String, String> { >>> > @TimerMap TimerMap timers = TimerSpecs.timerMap(); >>> > >>> > @ProcessElement >>> > public void process( >>> > @Element String e, >>> > @TimerMap TimerMap timers)) { >>> > // "get" would register a new TimerSpec >>> > Timer timer1 = timers.get("timer1"); >>> > Timer timer2 = timers.get("timer2"); >>> > timers.set(...); >>> > timers.set(...); >>> > } >>> > >>> > // No args for "@OnTimer" => use generic TimerMap >>> > @OnTimer >>> > public void onTimer( >>> > @TimerId String timerFired, >>> > @Timestamp Instant timerTs, >>> > @TimerMap TimerMap timers) { >>> > // Timer firing >>> > ... >>> > // Set this timer (or another) >>> > Timer timer = timers.get(timerFired); >>> > timer.set(...); >>> > } >>> > } >>> > >>> > What do you think? >>> > >>> > -Max >>> > >>> > On 22.10.19 10:35, Jan Lukavský wrote: >>> >> Hi Kenn, >>> >> >>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote: >>> >>> This seems extremely useful. >>> >>> >>> >>> I assume you mean `@OnTimer("timers")` in your example. I would >>> >>> suggest that the parameter annotation be something other >>> >>> than @TimerId since that annotation is already used for a very >>> >>> similar but different purpose; they are close enough that it is >>> >>> tempting to pun them, but it is clearer to keep them distinct IMO. >>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively, >>> >>> keep @TimerId in the parameter list and change the declaration >>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more >>> >>> clear naming than "map". >>> >>> >>> >>> At the portability level, this API does seem to be pretty close to a >>> >>> noop in terms of the messages that needs to be sent over the Fn API, >>> >>> so it makes sense to loosen the protos. By the time the Fn API is in >>> >>> play, all of our desires to catch errors prior to execution are >>> >>> irrelevant anyhow. >>> >>> >>> >>> On the other hand, I think DSLs have a different & bigger problem >>> >>> than this, in that they want to programmatically adjust all the >>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in >>> >>> another. Certainly some limited DSL use cases are addressed by this, >>> >>> but I wouldn't take that as a primary use case for this feature. >>> >>> Ultimately they are probably better served by being able to >>> >>> explicitly author a DoFnInvoker and provide it to a variant of >>> >>> beam:transforms:ParDo where the do_fn field is a serialized >>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we >>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload. >>> >>> That would allow maximum flexibility in utilizing the portability >>> >>> framework. >>> >> >>> >> yes, exactly, but when DSLs are in question, we have to make sure >>> >> that DSLs are not bound to portability - we have to be able to >>> >> translate even in case of "legacy" runners as well. That might >>> >> complicate things a bit maybe. >>> >> >>> >> Jan >>> >> >>> >>> >>> >>> Kenn >>> >>> >>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com >>> >>> <mailto:re...@google.com>> wrote: >>> >>> >>> >>> BEAM-6857 documents the need for dynamic timer support in the >>> Beam >>> >>> API. I wanted to make a proposal for what this API would look >>> >>> like, and how to express it in the portability protos. >>> >>> >>> >>> Background: Today Beam (especially BeamJava) requires a ParDo to >>> >>> statically declare all timers it accesses at compile time. For >>> >>> example: >>> >>> >>> >>> class MyDoFn extends DoFn<String, String> { >>> >>> @TimerId("timer1") TimerSpec timer1 = >>> >>> TimerSpecs.timer(TimeDomain(EVENT_TIME)); >>> >>> @TimerId("timer2") TimerSpec timer2 = >>> >>> TimerSpecs.timer(TimeDomain(PROCESSING_TIME)); >>> >>> >>> >>> @ProcessElement >>> >>> public void process(@Element String e, @TimerId("timer1") Timer >>> >>> timer1, @TimerId("timer2") Timer timer2)) { >>> >>> timer1.set(...); >>> >>> timer2.set(...); >>> >>> } >>> >>> >>> >>> @OnTimer("timer1") public void onTimer1() { ... } >>> >>> @OnTimer("timer2") public void onTimer2() { ... } >>> >>> } >>> >>> >>> >>> This requires the author of a ParDo to know the full list of >>> >>> timers ahead of time, which has been problematic in many cases. >>> >>> One example where it causes issues is for DSLs such as Euphoria >>> or >>> >>> Scio. DSL authors usually write ParDos to interpret the code >>> >>> written in the high-level DSL, and so don't know ahead of time >>> the >>> >>> list of timers needed; alternatives today are quite ugly: >>> physical >>> >>> code generation or creating a single timer that multiplexes all >>> of >>> >>> the users logical timers. There are also cases where a ParDo >>> needs >>> >>> multiple distinct timers, but the set of distinct timers is >>> >>> controlled by the input data, and therefore not knowable in >>> >>> advance. The Beam timer API has been insufficient for these use >>> >>> cases. >>> >>> >>> >>> I propose a new TimerMap construct, which allow a ParDo to >>> >>> dynamically set named timers. It's use in the Java API would look >>> >>> as follows: >>> >>> >>> >>> class MyDoFn extends DoFn<String, String> { >>> >>> @TimerId("timers") TimerSpec timers = >>> >>> TimerSpecs.timerMap(TimeDomain(EVENT_TIME)); >>> >>> >>> >>> @ProcessElement >>> >>> public void process(@Element String e, @TimerId("timers") >>> >>> TimerMap timer)) { >>> >>> timers.set("timer1", ...); >>> >>> timers.set("timer2", ...); >>> >>> } >>> >>> >>> >>> @OnTimer("timer") public void onTimer(@TimerId String >>> >>> timerFired, @Timestamp Instant timerTs) { ... } >>> >>> } >>> >>> >>> >>> There is a new TimerSpec type to specify a TimerMap. The TimerMap >>> >>> class itself allows dynamically setting multiple timers based on >>> a >>> >>> String tag argument. Each TimerMap has a single callback which >>> >>> when called is given the id of the timer that is currently >>> firing. >>> >>> >>> >>> It is allowed to have multiple TimerMap objects in a ParDo (and >>> >>> required if you want to have both processing-time and event-time >>> >>> timers in the same ParDo). Each TimerMap is its own logical >>> >>> namespace. i.e. if the user sets timers with the same string tag >>> >>> on different TimerMap objects the timers will not collide. >>> >>> >>> >>> Currently the portability protos were written to mirror the Java >>> >>> API, expecting one TimerSpec per timer accessed by the ParDo. I >>> >>> suggest that we instead make TimerMap the default for >>> portability, >>> >>> and model the current behavior on top of timer map. If this >>> proves >>> >>> problematic for some runners, we could instead introduce a new >>> >>> TimerSpec proto to represent TimerMap. >>> >>> >>> >>> Thoughts? >>> >>> >>> >>> Reuven >>> >>> >>> >>