On Mon, Oct 15, 2018 at 11:59 PM Lukasz Cwik <lc...@google.com> wrote: > > As Kenn mentioned, the timer is tracking the watermark of the main input PCollection and not the input watermark of the ParDo which would allow it to fire and thus it couldn't block itself.
Yes, and having to treat the watermarks of the different inputs differently is where the hoped-for simplicity breaks down. I suppose one could model this as [MainInput] -->-- --->-- [MainOutput(s)] | \ / | ParDo | / \ | [FiredTimers] [TimerSetRequests] \ \ / -->-- TimerConsolidatingAndFiringOp Where timers flow along that bottom loop clockwise and TimerConsolidatingAndFiringOp has watermark holds, but as I've stated before I think it's simpler for both users and runners to model that loop as an internal property of ParDo. > I didn't find the wiring in the Java SDK to be difficult since Flatten already required supporting multiple input producers for operations. Flatten doesn't distinguish between its inputs (i.e. it only has a single logical input, no matter how many operations feed into it), which makes things significantly simpler. (The wiring up of timer settings to output ports also felt a bit odd, but perhaps there are more natural ways to do that.) > On Mon, Oct 8, 2018 at 3:41 PM Kenneth Knowles <k...@apache.org> wrote: >> >> Really love this thread. The analysis is really educational. Seems like the pun of "PCollection" for so many purposes is hitting its limit. >> >> Timers should fire according to just the watermark of the data input, but nevertheless are a hold on GC and also output watermark. >> >> Kenn >> >> On Thu, Oct 4, 2018 at 3:12 AM Robert Bradshaw <rober...@google.com> wrote: >>> >>> Yes, this is all about how timers are represented in the model, as reified in the proto(s). >>> >>> Coming back around to this, I've started looking at what an alternative implementation/representations could look like. >>> >>> If we were to model timers as PCollections in the runner graph, it seems a self loop is too simple. Instead, one would have an output timer PCollection that gets fed into a special "per window-key timer consolidating" operation that then produces the input timer PCollection. Also, the input watermark being the min of the inputs doesn't quite work out, as a timer whose timestamp is less than its firing time should not prevent itself from firing (by holding up the input watermark). It seems there's a lot of caveats and complexity to try to fit it in the model this way (both conceptually, and implementation wise (e.g. one can no longer serialize a ParDo operation without augmenting the larger graph)), as opposed to letting timers be an "internal" property of some DoFns similar to how state is currently modeled. >>> >>> On the flip side, at execution time, I see the value in having PCollections (or at least explicit Input/Output ports) to be able to attach the choice of (windowed) timer encoding. This comes at the cost supporting multiple distinct input producers for operations (possibly unnecessarily complexity if timers are the only use of this) and the wiring of the timer input/output ports to the DoOperation was a bit awkward (in the Python SDK at least). >>> >>> >>> >>> On Fri, Sep 21, 2018 at 6:06 PM Maximilian Michels <m...@apache.org> wrote: >>>> >>>> Very interesting thread. >>>> >>>> Having read the original Timer design document, I find it compelling to >>>> model timers with a loop from producing to consuming PCollections. This >>>> makes it very explicit how timers are positioned in the dataflow. >>>> >>>> What Robert proposes looks less explicit, yet much closer to how Runner >>>> authors would go about to implement it. I'm not fully aware of any >>>> limitations of this model. Lukasz mentioned that we would have to hold >>>> back the Watermark for as long as the Timer is not yet set, as >>>> potentially it could have already been passed before set. As for output >>>> time being different from fire time, I suppose we can add a hold for the >>>> output watermark before the timer is fired. >>>> >>>> Whichever model we pursue, we have to solve the same >>>> problems/requirements for Timers. It does look like this is more a >>>> problem of how things are represented in the proto? Practically, the >>>> runtime implementation looks similar. >>>> >>>> If I had to choose I'd probably go for timers being represented as part >>>> of a spec for a DoFn (which seems to be already the case). Timers as >>>> separate PCollections seems elegant but less practical to me. >>>> >>>> -Max >>>> >>>> [Disclaimer: I could be wrong since I just thought about this in more >>>> detail] >>>> >>>> On 20.09.18 00:28, Robert Bradshaw wrote: >>>> > On Wed, Sep 19, 2018 at 11:54 PM Lukasz Cwik <lc...@google.com >>>> > <mailto:lc...@google.com>> wrote: >>>> > >>>> > >>>> > On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw < rober...@google.com >>>> > <mailto:rober...@google.com>> wrote: >>>> > >>>> > On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik <lc...@google.com >>>> > <mailto:lc...@google.com>> wrote: >>>> > >>>> > *How does modelling a timer as a PCollection help the Beam >>>> > model?* >>>> > >>>> > The largest concern was about how to model timers within >>>> > Apache Beam that: >>>> > 1) removed the need for the watermark hold that is typically >>>> > accompanied with state/timer implementations >>>> > 2) enabled the ability to set the explicit output time to be >>>> > independent of the firing time for all timer specifications [1] >>>> > >>>> > I felt as though treating timers as a self-loop around the >>>> > ParDo PTransform allowed us to use the natural definition of >>>> > output watermark = min(all input watermarks) as a way to >>>> > define how timers hold output and using windowed values that >>>> > contained timers as a natural way to represent the output >>>> > time to be independent of the firing time. The purpose of >>>> > the PCollection right now is to store the representation of >>>> > how timers are encoded. I suspect that at some point in time >>>> > we will have different timer encodings. >>>> > >>>> > >>>> > I agree that being able to separate the hold time from firing >>>> > time of a timer is important, but in retrospect don't think >>>> > timers as PCollections is the only (or most natural) way to >>>> > represent that (in the model or in runner implementations). >>>> > >>>> > Can you go into more detail as to what your suggesting as the >>>> > replacement and why you believe it fits the model more naturally >>>> > since "state" doesn't have watermarks or produce output but timers >>>> > can. I'm not disagreeing that timers as PCollections may not be a >>>> > natural fit but I don't see them as state as well since "user state" >>>> > doesn't produce output. >>>> > >>>> > >>>> > Yeah, timers are their own thing. They come in like data, but are >>>> > written out like state. >>>> > >>>> > I guess the high level is that I think the beam graph should represent, >>>> > within reason, the user's model of what their pipeline is, and treating >>>> > timers as PCollections with this self-loop feels like an implementation >>>> > detail, and furthermore an implementation detail that no runner is >>>> > actually going to use to implement things. And (again, this is >>>> > subjective) seems to complicate both the reasoning about a pipeline and >>>> > implementing its execution over viewing the stateful/timely aspects of a >>>> > DoFn as internal details to the ParDo operation. >>>> > >>>> > Having this fit well with how timers are delivered between >>>> > the SDK and Runner was an added bonus. Also, a good portion >>>> > of the code that I needed to fix up was more related to the >>>> > assumption that there was ever only a single input producer >>>> > to an executable stage and plumbing of timer specifications >>>> > through all the runner library support layers. >>>> > >>>> > ---------- >>>> > *There is no "clear" for timers.* >>>> > >>>> > The current Java API for timers only allows you to set them. >>>> > Clearing timers is not exposed to users and is only used by >>>> > internal implementations to support runners[2] via >>>> > TimerInternals. Usage of a timer is like so: >>>> > @TimerId("timer") >>>> > private final TimerSpec timerSpec = >>>> > TimerSpecs.timer(TimeDomain.EVENT_TIME); >>>> > >>>> > @ProcessElement >>>> > public void process( >>>> > ProcessContext context, >>>> > BoundedWindow window, >>>> > @TimerId("timer") Timer myTimer) { >>>> > >>>> > myTimer.set(window.maxTimestamp().plus(allowedLateness)); >>>> > } >>>> > >>>> > >>>> > We'll probably want clear. But currently there's already exactly >>>> > one timer per window per key, and setting another one overwrites >>>> > the previous one, again making it more like state. Maybe, as you >>>> > said, it could involve retractions (but every output being a >>>> > retraction seems odd.) >>>> > >>>> > Once retractions exist, most GBK firings will have a preceding >>>> > retraction so I believe they will be very common. >>>> > >>>> > >>>> > True, but I don't think we want to insert the GBK + CV in the graph to >>>> > represent the consolidation that's going on here. >>>> > >>>> > * side inputs already require a runner to introspect the >>>> > ParDo payload to get the SideInputSpec, requiring it to have >>>> > knowledge of the TimerSpec is no different. >>>> > >>>> > >>>> > My point was that once it has knowelge of the TimerSpec, there >>>> > is no need for (meaning no additional information provided by) >>>> > the timer PCollection nor its edges. >>>> > >>>> > The way in which the timer is encoded is missing. This could be >>>> > explicit on the TimerSpec like the other StateSpec definitions though. >>>> > >>>> > >>>> > Ah, I didn't realize there was a choice in the matter. >>>> > >>>> > >>>> > On Wed, Sep 19, 2018 at 11:57 PM Reuven Lax <re...@google.com >>>> > <mailto:re...@google.com>> wrote: >>>> > >>>> > >>>> > We'll probably want clear. But currently there's already exactly >>>> > one timer per window per key, and setting another one overwrites >>>> > the previous one, again making it more like state. Maybe, as you >>>> > said, it could involve retractions (but every output being a >>>> > retraction seems odd.) >>>> > >>>> > >>>> > This is not true. We support multiple (tagged) timers per key. >>>> > >>>> > >>>> > Yeah, I misspoke. I meant every distinct (tagged) timer has one firing >>>> > time, rather than getting appended like in a PCollection. >>>> > >>>> > - Robert >>>> >