On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reuven,
>
> yes, if this change is intended to be used by end users, then
> DoFnSignatures cannot be used, agree on that. Regarding the relationship
> with dynamic state - I agree that this is separate problem, but because it
> is close enough, we should know how we want to deal with that. Because
> state and timers share some functionality (after all timers need state to
> be fault tolerant), these API should IMO share the same logic. Whatever
> solution chosen to expose dynamic timers, it should extend to dynamic state.
>
> I'd like to stop a little with the premise that users want dynamic timers
> (that is timers whose *name* - and therefore behavior - is determined by
> incoming data). Could this case be modeled so that the timer actually has
> one more (implicit) state variable that actually holds collection of tuples
> (timestamp, name)? Then the timer would be invoked at given (minimum of all
> currently set) timestamps with respective name? The question here probably
> is - can this have performance impact? That is to say - can any runner
> actually do anything different from this in the sense of time complexity of
> the algorithm?
>

Yes - you could always multiplex many timers one one. This is what some
users do today, but it tends to be very inefficient and also complex. The
Beam model requires runners to support dynamic timers per key (e.g. that
how windowing is implemented - each window has a separate timer), so
there's no reason not to expose this to users.

> I'm a little afraid if actually letting users define data-driven timers
> might not be too restrictive for some runners. Yes, runners that don't have
> this option would probably be able to resort to the logic described above,
> but if this work could be reasonably done for all runners, then we wouldn't
> force runners to actually implement it. And, the API could look as if the
> timers were actually dynamic.
>
> Jan
>
> P.S. If dynamic (and therefore any number) of timers can be actually
> implemented using single timer, that might be interesting pattern, because
> single timer per (window, key) has many nice properties, like it implicitly
> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
> seems to issue for multiple runners (samza, portable flink).
>
BEAM-7520 is simply an implementation bug. I don't think it makes sense to
fix a bug by restricting the model.


> On 10/22/19 6:52 PM, Reuven Lax wrote:
>
> Kenn:
> +1 to using TimerFamily instead of TimerId and TimerMap.
>
> Jan:
> This is definitely not just for DSLs. I've definitely seen cases where the
> user wants different timers based on input data, so they cannot be defined
> statically. As a thought experiment: one stated goal of state + timers was
> to provide the low-level tools we use to implement windowing. However to
> implement windowing you need a dynamic set of timers, not just a single
> one. Now most users don't need to reimplement windowing (though we have had
> some users who had that need, when they wanted something slightly different
> than what native Beam windowing provided), however the need for dynamic
> timers is not unheard of.
>
> +1 to allowing dynamic state. However I think this is separate enough from
> timers that it doesn't need to be coupled in this discussion. Dynamic state
> also raises the wrinkle of pipeline compatibility (as you mentioned),
> which I think is a bit less of an issue for dynamic timers.
>
> Allowing a DSL to specify a DoFnSignature does not quite solve this
> problem. The DSL still needs a way to set and process the timers. It also
> does not solve the problem where the timers are based on input data
> elements, so cannot be known at pipeline construction time. However what
> might be more important is statically defining the timer families, and a
> DSL could do this by specifying a DoFnSignature (and something similar
> could be done with state). Also as mentioned above, this is useful to
> normal Beam users as well, and we shouldn't force normal users to start
> dealing with DoFnSignatures and DoFnInvokers.
>
>
>
>
>
>
> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Max,
>>
>> wouldn't that be actually the same as
>>
>> class MyDoFn extends DoFn<String, String> {
>>
>>
>>    @ProcessElement
>>    public void process(
>>        ProcessContext context) {
>>      // "get" would register a new TimerSpec
>>      Timer timer1 = context.getTimer("timer1");
>>      Timer timer2 = context.getTimer("timer2");
>>      timers.set(...);
>>      timers.set(...);
>>    }
>>
>> That is - no need to declare anything? One more concern about that - if
>> we allow registration of timers (or even state) dynamically like that it
>> might be harder to perform validation of pipeline upon upgrades.
>>
>> Jan
>>
>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>> > The idea makes sense to me. I really like that Beam gives upfront
>> > specs for timer and state, but it is not flexible enough for
>> > timer-based libraries or for users which want to dynamically generate
>> > timers.
>> >
>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>> > timer specs from setting actual timers?
>> >
>> > Suggestion:
>> >
>> > class MyDoFn extends DoFn<String, String> {
>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>> >
>> >   @ProcessElement
>> >   public void process(
>> >       @Element String e,
>> >       @TimerMap TimerMap timers)) {
>> >     // "get" would register a new TimerSpec
>> >     Timer timer1 = timers.get("timer1");
>> >     Timer timer2 = timers.get("timer2");
>> >     timers.set(...);
>> >     timers.set(...);
>> >   }
>> >
>> >   // No args for "@OnTimer" => use generic TimerMap
>> >   @OnTimer
>> >   public void onTimer(
>> >       @TimerId String timerFired,
>> >       @Timestamp Instant timerTs,
>> >       @TimerMap TimerMap timers) {
>> >      // Timer firing
>> >      ...
>> >      // Set this timer (or another)
>> >      Timer timer = timers.get(timerFired);
>> >      timer.set(...);
>> >   }
>> > }
>> >
>> > What do you think?
>> >
>> > -Max
>> >
>> > On 22.10.19 10:35, Jan Lukavský wrote:
>> >> Hi Kenn,
>> >>
>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>> >>> This seems extremely useful.
>> >>>
>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>> >>> suggest that the parameter annotation be something other
>> >>> than @TimerId since that annotation is already used for a very
>> >>> similar but different purpose; they are close enough that it is
>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>> >>> keep @TimerId in the parameter list and change the declaration
>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>> >>> clear naming than "map".
>> >>>
>> >>> At the portability level, this API does seem to be pretty close to a
>> >>> noop in terms of the messages that needs to be sent over the Fn API,
>> >>> so it makes sense to loosen the protos. By the time the Fn API is in
>> >>> play, all of our desires to catch errors prior to execution are
>> >>> irrelevant anyhow.
>> >>>
>> >>> On the other hand, I think DSLs have a different & bigger problem
>> >>> than this, in that they want to programmatically adjust all the
>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>> >>> another. Certainly some limited DSL use cases are addressed by this,
>> >>> but I wouldn't take that as a primary use case for this feature.
>> >>> Ultimately they are probably better served by being able to
>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>> >>> That would allow maximum flexibility in utilizing the portability
>> >>> framework.
>> >>
>> >> yes, exactly, but when DSLs are in question, we have to make sure
>> >> that DSLs are not bound to portability - we have to be able to
>> >> translate even in case of "legacy" runners as well. That might
>> >> complicate things a bit maybe.
>> >>
>> >> Jan
>> >>
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com
>> >>> <mailto:re...@google.com>> wrote:
>> >>>
>> >>>     BEAM-6857 documents the need for dynamic timer support in the Beam
>> >>>     API. I wanted to make a proposal for what this API would look
>> >>>     like, and how to express it in the portability protos.
>> >>>
>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>> >>>     statically declare all timers it accesses at compile time. For
>> >>>     example:
>> >>>
>> >>>     class MyDoFn extends DoFn<String, String> {
>> >>>       @TimerId("timer1") TimerSpec timer1 =
>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>> >>>       @TimerId("timer2") TimerSpec timer2 =
>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>> >>>
>> >>>       @ProcessElement
>> >>>       public void process(@Element String e, @TimerId("timer1") Timer
>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>> >>>         timer1.set(...);
>> >>>         timer2.set(...);
>> >>>       }
>> >>>
>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>> >>>     }
>> >>>
>> >>>     This requires the author of a ParDo to know the full list of
>> >>>     timers ahead of time, which has been problematic in many cases.
>> >>>     One example where it causes issues is for DSLs such as Euphoria or
>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>> >>>     written in the high-level DSL, and so don't know ahead of time the
>> >>>     list of timers needed; alternatives today are quite ugly: physical
>> >>>     code generation or creating a single timer that multiplexes all of
>> >>>     the users logical timers. There are also cases where a ParDo needs
>> >>>     multiple distinct timers, but the set of distinct timers is
>> >>>     controlled by the input data, and therefore not knowable in
>> >>>     advance. The Beam timer API has been insufficient for these use
>> >>> cases.
>> >>>
>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>> >>>     dynamically set named timers. It's use in the Java API would look
>> >>>     as follows:
>> >>>
>> >>>     class MyDoFn extends DoFn<String, String> {
>> >>>       @TimerId("timers") TimerSpec timers =
>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>> >>>
>> >>>       @ProcessElement
>> >>>       public void process(@Element String e, @TimerId("timers")
>> >>>     TimerMap timer)) {
>> >>>         timers.set("timer1", ...);
>> >>>         timers.set("timer2", ...);
>> >>>       }
>> >>>
>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>> >>>     }
>> >>>
>> >>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>> >>>     class itself allows dynamically setting multiple timers based on a
>> >>>     String tag argument. Each TimerMap has a single callback which
>> >>>     when called is given the id of the timer that is currently firing.
>> >>>
>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>> >>>     required if you want to have both processing-time and event-time
>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>> >>>     namespace. i.e. if the user sets timers with the same string tag
>> >>>     on different TimerMap objects the timers will not collide.
>> >>>
>> >>>     Currently the portability protos were written to mirror the Java
>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>> >>>     suggest that we instead make TimerMap the default for portability,
>> >>>     and model the current behavior on top of timer map. If this proves
>> >>>     problematic for some runners, we could instead introduce a new
>> >>>     TimerSpec proto to represent TimerMap.
>> >>>
>> >>>     Thoughts?
>> >>>
>> >>>     Reuven
>> >>>
>>
>

Reply via email to