On Mon, Oct 15, 2018 at 11:59 PM Lukasz Cwik <lc...@google.com> wrote:
>
> As Kenn mentioned, the timer is tracking the watermark of the main input
PCollection and not the input watermark of the ParDo which would allow it
to fire and thus it couldn't block itself.

Yes, and having to treat the watermarks of the different inputs differently
is where the hoped-for simplicity breaks down.

I suppose one could model this as

[MainInput] -->--           --->-- [MainOutput(s)]
  |               \       /
  |                 ParDo
  |               /       \
  |    [FiredTimers]     [TimerSetRequests]
   \              \       /
     -->-- TimerConsolidatingAndFiringOp


Where timers flow along that bottom loop clockwise and
TimerConsolidatingAndFiringOp has watermark holds, but as I've stated
before I think it's simpler for both users and runners to model that loop
as an internal property of ParDo.


> I didn't find the wiring in the Java SDK to be difficult since Flatten
already required supporting multiple input producers for operations.

Flatten doesn't distinguish between its inputs (i.e. it only has a single
logical input, no matter how many operations feed into it), which makes
things significantly simpler. (The wiring up of timer settings to output
ports also felt a bit odd, but perhaps there are more natural ways to do
that.)

> On Mon, Oct 8, 2018 at 3:41 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>> Really love this thread. The analysis is really educational. Seems like
the pun of "PCollection" for so many purposes is hitting its limit.
>>
>> Timers should fire according to just the watermark of the data input,
but nevertheless are a hold on GC and also output watermark.
>>
>> Kenn
>>
>> On Thu, Oct 4, 2018 at 3:12 AM Robert Bradshaw <rober...@google.com>
wrote:
>>>
>>> Yes, this is all about how timers are represented in the model, as
reified in the proto(s).
>>>
>>> Coming back around to this, I've started looking at what an alternative
implementation/representations could look like.
>>>
>>> If we were to model timers as PCollections in the runner graph, it
seems a self loop is too simple. Instead, one would have an output timer
PCollection that gets fed into a special "per window-key timer
consolidating" operation that then produces the input timer PCollection.
Also, the input watermark being the min of the inputs doesn't quite work
out, as a timer whose timestamp is less than its firing time should not
prevent itself from firing (by holding up the input watermark). It seems
there's a lot of caveats and complexity to try to fit it in the model this
way (both conceptually, and implementation wise (e.g. one can no longer
serialize a ParDo operation without augmenting the larger graph)), as
opposed to letting timers be an "internal" property of some DoFns similar
to how state is currently modeled.
>>>
>>> On the flip side, at execution time, I see the value in having
PCollections (or at least explicit Input/Output ports) to be able to attach
the choice of (windowed) timer encoding. This comes at the cost supporting
multiple distinct input producers for operations (possibly unnecessarily
complexity if timers are the only use of this) and the wiring of the timer
input/output ports to the DoOperation was a bit awkward (in the Python SDK
at least).
>>>
>>>
>>>
>>> On Fri, Sep 21, 2018 at 6:06 PM Maximilian Michels <m...@apache.org>
wrote:
>>>>
>>>> Very interesting thread.
>>>>
>>>> Having read the original Timer design document, I find it compelling to
>>>> model timers with a loop from producing to consuming PCollections. This
>>>> makes it very explicit how timers are positioned in the dataflow.
>>>>
>>>> What Robert proposes looks less explicit, yet much closer to how Runner
>>>> authors would go about to implement it. I'm not fully aware of any
>>>> limitations of this model. Lukasz mentioned that we would have to hold
>>>> back the Watermark for as long as the Timer is not yet set, as
>>>> potentially it could have already been passed before set. As for output
>>>> time being different from fire time, I suppose we can add a hold for
the
>>>> output watermark before the timer is fired.
>>>>
>>>> Whichever model we pursue, we have to solve the same
>>>> problems/requirements for Timers. It does look like this is more a
>>>> problem of how things are represented in the proto? Practically, the
>>>> runtime implementation looks similar.
>>>>
>>>> If I had to choose I'd probably go for timers being represented as part
>>>> of a spec for a DoFn (which seems to be already the case). Timers as
>>>> separate PCollections seems elegant but less practical to me.
>>>>
>>>> -Max
>>>>
>>>> [Disclaimer: I could be wrong since I just thought about this in more
>>>> detail]
>>>>
>>>> On 20.09.18 00:28, Robert Bradshaw wrote:
>>>> > On Wed, Sep 19, 2018 at 11:54 PM Lukasz Cwik <lc...@google.com
>>>> > <mailto:lc...@google.com>> wrote:
>>>> >
>>>> >
>>>> >     On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw <
rober...@google.com
>>>> >     <mailto:rober...@google.com>> wrote:
>>>> >
>>>> >         On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik <lc...@google.com
>>>> >         <mailto:lc...@google.com>> wrote:
>>>> >
>>>> >             *How does modelling a timer as a PCollection help the
Beam
>>>> >             model?*
>>>> >
>>>> >             The largest concern was about how to model timers within
>>>> >             Apache Beam that:
>>>> >             1) removed the need for the watermark hold that is
typically
>>>> >             accompanied with state/timer implementations
>>>> >             2) enabled the ability to set the explicit output time
to be
>>>> >             independent of the firing time for all timer
specifications [1]
>>>> >
>>>> >             I felt as though treating timers as a self-loop around
the
>>>> >             ParDo PTransform allowed us to use the natural
definition of
>>>> >             output watermark = min(all input watermarks) as a way to
>>>> >             define how timers hold output and using windowed values
that
>>>> >             contained timers as a natural way to represent the output
>>>> >             time to be independent of the firing time. The purpose of
>>>> >             the PCollection right now is to store the representation
of
>>>> >             how timers are encoded. I suspect that at some point in
time
>>>> >             we will have different timer encodings.
>>>> >
>>>> >
>>>> >         I agree that being able to separate the hold time from firing
>>>> >         time of a timer is important, but in retrospect don't think
>>>> >         timers as PCollections is the only (or most natural) way to
>>>> >         represent that (in the model or in runner implementations).
>>>> >
>>>> >     Can you go into more detail as to what your suggesting as the
>>>> >     replacement and why you believe it fits the model more naturally
>>>> >     since "state" doesn't have watermarks or produce output but
timers
>>>> >     can. I'm not disagreeing that timers as PCollections may not be a
>>>> >     natural fit but I don't see them as state as well since "user
state"
>>>> >     doesn't produce output.
>>>> >
>>>> >
>>>> > Yeah, timers are their own thing. They come in like data, but are
>>>> > written out like state.
>>>> >
>>>> > I guess the high level is that I think the beam graph should
represent,
>>>> > within reason, the user's model of what their pipeline is, and
treating
>>>> > timers as PCollections with this self-loop feels like an
implementation
>>>> > detail, and furthermore an implementation detail that no runner is
>>>> > actually going to use to implement things. And (again, this is
>>>> > subjective) seems to complicate both the reasoning about a pipeline
and
>>>> > implementing its execution over viewing the stateful/timely aspects
of a
>>>> > DoFn as internal details to the ParDo operation.
>>>> >
>>>> >             Having this fit well with how timers are delivered
between
>>>> >             the SDK and Runner was an added bonus. Also, a good
portion
>>>> >             of the code that I needed to fix up was more related to
the
>>>> >             assumption that there was ever only a single input
producer
>>>> >             to an executable stage and plumbing of timer
specifications
>>>> >             through all the runner library support layers.
>>>> >
>>>> >             ----------
>>>> >             *There is no "clear" for timers.*
>>>> >
>>>> >             The current Java API for timers only allows you to set
them.
>>>> >             Clearing timers is not exposed to users and is only used
by
>>>> >             internal implementations to support runners[2] via
>>>> >             TimerInternals. Usage of a timer is like so:
>>>> >                @TimerId("timer")
>>>> >                private final TimerSpec timerSpec =
>>>> >             TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>> >
>>>> >                @ProcessElement
>>>> >                public void process(
>>>> >                    ProcessContext context,
>>>> >                    BoundedWindow window,
>>>> >                    @TimerId("timer") Timer myTimer) {
>>>> >
>>>> >
 myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>>> >                }
>>>> >
>>>> >
>>>> >         We'll probably want clear. But currently there's already
exactly
>>>> >         one timer per window per key, and setting another one
overwrites
>>>> >         the previous one, again making it more like state. Maybe, as
you
>>>> >         said, it could involve retractions (but every output being a
>>>> >         retraction seems odd.)
>>>> >
>>>> >     Once retractions exist, most GBK firings will have a preceding
>>>> >     retraction so I believe they will be very common.
>>>> >
>>>> >
>>>> > True, but I don't think we want to insert the GBK + CV in the graph
to
>>>> > represent the consolidation that's going on here.
>>>> >
>>>> >             * side inputs already require a runner to introspect the
>>>> >             ParDo payload to get the SideInputSpec, requiring it to
have
>>>> >             knowledge of the TimerSpec is no different.
>>>> >
>>>> >
>>>> >         My point was that once it has knowelge of the TimerSpec,
there
>>>> >         is no need for (meaning no additional information provided
by)
>>>> >         the timer PCollection nor its edges.
>>>> >
>>>> >     The way in which the timer is encoded is missing. This could be
>>>> >     explicit on the TimerSpec like the other StateSpec definitions
though.
>>>> >
>>>> >
>>>> > Ah, I didn't realize there was a choice in the matter.
>>>> >
>>>> >
>>>> > On Wed, Sep 19, 2018 at 11:57 PM Reuven Lax <re...@google.com
>>>> > <mailto:re...@google.com>> wrote:
>>>> >
>>>> >
>>>> >         We'll probably want clear. But currently there's already
exactly
>>>> >         one timer per window per key, and setting another one
overwrites
>>>> >         the previous one, again making it more like state. Maybe, as
you
>>>> >         said, it could involve retractions (but every output being a
>>>> >         retraction seems odd.)
>>>> >
>>>> >
>>>> >     This is not true. We support multiple (tagged) timers per key.
>>>> >
>>>> >
>>>> > Yeah, I misspoke. I meant every distinct (tagged) timer has one
firing
>>>> > time, rather than getting appended like in a PCollection.
>>>> >
>>>> > - Robert
>>>> >

Reply via email to