Short train of thoughts continues. I do agree that something could change, I just don't believe that we are at something better yet with what has been proposed so far.
On Wed, Sep 19, 2018 at 11:43 AM Lukasz Cwik <lc...@google.com> wrote: > Clear can be modeled by a boolean state cell of process/ignore next timer > firing. Not as good for watermark advancement though if you can eagerly > clear something. > > Longer term you could retract a "timer" from a PCollection once > retractions are supported. > > On Wed, Sep 19, 2018 at 11:40 AM Reuven Lax <re...@google.com> wrote: > >> I believe that clearTimer has been a feature request before though. >> >> On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik <lc...@google.com> wrote: >> >>> *How does modelling a timer as a PCollection help the Beam model?* >>> >>> The largest concern was about how to model timers within Apache Beam >>> that: >>> 1) removed the need for the watermark hold that is typically accompanied >>> with state/timer implementations >>> 2) enabled the ability to set the explicit output time to be independent >>> of the firing time for all timer specifications [1] >>> >>> I felt as though treating timers as a self-loop around the ParDo >>> PTransform allowed us to use the natural definition of output watermark = >>> min(all input watermarks) as a way to define how timers hold output and >>> using windowed values that contained timers as a natural way to represent >>> the output time to be independent of the firing time. The purpose of the >>> PCollection right now is to store the representation of how timers are >>> encoded. I suspect that at some point in time we will have different timer >>> encodings. >>> >>> Having this fit well with how timers are delivered between the SDK and >>> Runner was an added bonus. Also, a good portion of the code that I needed >>> to fix up was more related to the assumption that there was ever only a >>> single input producer to an executable stage and plumbing of timer >>> specifications through all the runner library support layers. >>> >>> ---------- >>> *There is no "clear" for timers.* >>> >>> The current Java API for timers only allows you to set them. Clearing >>> timers is not exposed to users and is only used by internal implementations >>> to support runners[2] via TimerInternals. Usage of a timer is like so: >>> @TimerId("timer") >>> private final TimerSpec timerSpec = >>> TimerSpecs.timer(TimeDomain.EVENT_TIME); >>> >>> @ProcessElement >>> public void process( >>> ProcessContext context, >>> BoundedWindow window, >>> @TimerId("timer") Timer myTimer) { >>> >>> myTimer.set(window.maxTimestamp().plus(allowedLateness)); >>> } >>> >>> --------- >>> I'm not a big fan of having timers as a separate field in the elements >>> proto. I still think they should be treated as an input/output and we could >>> update the representation so that inputs/outputs for PTransforms don't need >>> to be "PCollections". I was thinking that our current PCollection >>> representation assumes that we'll never want to change it to add extra >>> information or do backwards incompatible changes like beam:pcollection:v2. >>> >>> --------- >>> Other points: >>> * side inputs already require a runner to introspect the ParDo payload >>> to get the SideInputSpec, requiring it to have knowledge of the TimerSpec >>> is no different. >>> * multimap side input over timers where the key is the key that the >>> timer is associated with. iterable side input over timers would allow you >>> to iterate over <key, timer> pairs. This could be useful for skew control >>> in sources since they would want to know how far they are ahead vs other >>> restrictions. >>> * user state as a PCollection can make sense but I can't see how we can >>> get past problems when we treat it as an "input" since the input watermark >>> would be ignored or infinity?. I do agree that this could open the door to >>> sharing "state" such as multi-key transactions but very speculative as you >>> say. >>> >>> >>> 1: https://issues.apache.org/jira/browse/BEAM-2535 >>> 2: >>> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22 >>> >>> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise <t...@apache.org> wrote: >>> >>>> Robert, >>>> >>>> Thanks for presenting these thoughts. Your attempt to implement the >>>> timer support in the Python runner is the first strong signal we have and >>>> it is the right time to make changes - AFAIK no other runner work has been >>>> done. >>>> >>>> I'm also a bit concerned about the acrobatics required in the PR to >>>> make this work. Luke will be in the best position to comment, but as I >>>> recall we considered modeling timers as special PCollections a >>>> simplification for SDK <> Runner interaction and overall implementation. >>>> The special treatment (and slight confusion) at the graph level perhaps was >>>> an early warning sign, discovering the extra complexity wiring this in a >>>> runner should be a reason to revisit. >>>> >>>> Conceptually timers are special state, they are certainly more state >>>> than stream :) Regardless how they are passed to the harness, the runner >>>> will need to treat them similar to side inputs and user state. >>>> >>>> Thanks, >>>> Thomas >>>> >>>> >>>> >>>> >>>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> TLDR Perhaps we should revisit >>>>> https://s.apache.org/beam-portability-timers in light of the fact >>>>> that Timers are more like State than PCollections. >>>>> >>>>> -- >>>>> >>>>> While looking at implementing State and Timers in the Python SDK, I've >>>>> been revisiting the ideas presented at >>>>> https://s.apache.org/beam-portability-timers , and am now starting to >>>>> wonder if this is actually the best way to model things (at least at the >>>>> Runner level). Instead it seems Timers are more resemble, and are tightly >>>>> bound to, State than PCollections. >>>>> >>>>> This is especially clear when writing timers. These timers are not a >>>>> bag of emitted elements, rather one sets (and clears) timers and the set >>>>> of >>>>> timers that end up firing are a result of this *ordered* sequence of >>>>> operations. It is also often important that the setting of timers be >>>>> ordered with respect to the setting and clearing of state itself (and is >>>>> more often than not collocated with such requests). >>>>> >>>>> In addition, these self-loops add complexity to the graph but provide >>>>> no additional information--they are entirely redundant with the timerspecs >>>>> already present on DoFns. Generally I prefer less redundancy in the spec, >>>>> rather than have it be over-constrained. It's unclear what a runner that >>>>> didn't introspect the DoFn's TimerSpecs would do with this these special >>>>> edges, and also unclear how they would differ from possible self-loops due >>>>> to more traditional iteration. >>>>> >>>>> The primary motivation to express timers in this way seems to be the >>>>> desire to push them to workers using the data plan, rather than inventing >>>>> another mechanism or making them pull-based like with state. I think this >>>>> could be done by simply adding a Timer field to the Elements (or Data) >>>>> proto. (Note that this is not the same as having an hacky ElementOrTimer >>>>> elements flow through the graph.) Writes would be state requests, and >>>>> perhaps it would even make sense to "read" the current value of an unfired >>>>> timer over the state API, to be able to set things like >>>>> {min,max}(new_timestamp,old_timestamp}. >>>>> >>>>> (We could alternatively attempt to model State(s) as a PCollection(s), >>>>> but this is more speculative and would likely exacerbate some of the >>>>> issues >>>>> above (though it could open the door for DoFns that somehow *share* >>>>> state). >>>>> They seem like different objects though, one is a mutable store, the other >>>>> an immutable stream.) >>>>> >>>>> I realize this is a big shift, but we could probably adapt the >>>>> existing Python/Java implementations fairly easily (and it would probably >>>>> simplify them). And it's easier to do simplifications like this sooner >>>>> rather than later. >>>>> >>>>> What do people think about this? Any obvious (or not-so-obvious) >>>>> downsides that I'm missing? >>>>> >>>>> - Robert >>>>> >>>>>