Clear can be modeled by a boolean state cell of process/ignore next timer
firing. Not as good for watermark advancement though if you can eagerly
clear something.

Longer term you could retract a "timer" from a PCollection once retractions
are supported.

On Wed, Sep 19, 2018 at 11:40 AM Reuven Lax <re...@google.com> wrote:

> I believe that clearTimer has been a feature request before though.
>
> On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> *How does modelling a timer as a PCollection help the Beam model?*
>>
>> The largest concern was about how to model timers within Apache Beam that:
>> 1) removed the need for the watermark hold that is typically accompanied
>> with state/timer implementations
>> 2) enabled the ability to set the explicit output time to be independent
>> of the firing time for all timer specifications [1]
>>
>> I felt as though treating timers as a self-loop around the ParDo
>> PTransform allowed us to use the natural definition of output watermark =
>> min(all input watermarks) as a way to define how timers hold output and
>> using windowed values that contained timers as a natural way to represent
>> the output time to be independent of the firing time. The purpose of the
>> PCollection right now is to store the representation of how timers are
>> encoded. I suspect that at some point in time we will have different timer
>> encodings.
>>
>> Having this fit well with how timers are delivered between the SDK and
>> Runner was an added bonus. Also, a good portion of the code that I needed
>> to fix up was more related to the assumption that there was ever only a
>> single input producer to an executable stage and plumbing of timer
>> specifications through all the runner library support layers.
>>
>> ----------
>> *There is no "clear" for timers.*
>>
>> The current Java API for timers only allows you to set them. Clearing
>> timers is not exposed to users and is only used by internal implementations
>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>   @TimerId("timer")
>>   private final TimerSpec timerSpec =
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>   @ProcessElement
>>   public void process(
>>       ProcessContext context,
>>       BoundedWindow window,
>>       @TimerId("timer") Timer myTimer) {
>>
>>     myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>   }
>>
>> ---------
>> I'm not a big fan of having timers as a separate field in the elements
>> proto. I still think they should be treated as an input/output and we could
>> update the representation so that inputs/outputs for PTransforms don't need
>> to be "PCollections". I was thinking that our current PCollection
>> representation assumes that we'll never want to change it to add extra
>> information or do backwards incompatible changes like beam:pcollection:v2.
>>
>> ---------
>> Other points:
>> * side inputs already require a runner to introspect the ParDo payload to
>> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
>> no different.
>> * multimap side input over timers where the key is the key that the timer
>> is associated with. iterable side input over timers would allow you to
>> iterate over <key, timer> pairs. This could be useful for skew control in
>> sources since they would want to know how far they are ahead vs other
>> restrictions.
>> * user state as a PCollection can make sense but I can't see how we can
>> get past problems when we treat it as an "input" since the input watermark
>> would be ignored or infinity?. I do agree that this could open the door to
>> sharing "state" such as multi-key transactions but very speculative as you
>> say.
>>
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-2535
>> 2:
>> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>>
>> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise <t...@apache.org> wrote:
>>
>>> Robert,
>>>
>>> Thanks for presenting these thoughts. Your attempt to implement the
>>> timer support in the Python runner is the first strong signal we have and
>>> it is the right time to make changes - AFAIK no other runner work has been
>>> done.
>>>
>>> I'm also a bit concerned about the acrobatics required in the PR to make
>>> this work. Luke will be in the best position to comment, but as I recall we
>>> considered modeling timers as special PCollections a simplification for SDK
>>> <> Runner interaction and overall implementation. The special treatment
>>> (and slight confusion) at the graph level perhaps was an early warning
>>> sign, discovering the extra complexity wiring this in a runner should be a
>>> reason to revisit.
>>>
>>> Conceptually timers are special state, they are certainly more state
>>> than stream :) Regardless how they are passed to the harness, the runner
>>> will need to treat them similar to side inputs and user state.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>>
>>>
>>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> TLDR Perhaps we should revisit
>>>> https://s.apache.org/beam-portability-timers in light of the fact that
>>>> Timers are more like State than PCollections.
>>>>
>>>> --
>>>>
>>>> While looking at implementing State and Timers in the Python SDK, I've
>>>> been revisiting the ideas presented at
>>>> https://s.apache.org/beam-portability-timers , and am now starting to
>>>> wonder if this is actually the best way to model things (at least at the
>>>> Runner level). Instead it seems Timers are more resemble, and are tightly
>>>> bound to, State than PCollections.
>>>>
>>>> This is especially clear when writing timers. These timers are not a
>>>> bag of emitted elements, rather one sets (and clears) timers and the set of
>>>> timers that end up firing are a result of this *ordered* sequence of
>>>> operations. It is also often important that the setting of timers be
>>>> ordered with respect to the setting and clearing of state itself (and is
>>>> more often than not collocated with such requests).
>>>>
>>>> In addition, these self-loops add complexity to the graph but provide
>>>> no additional information--they are entirely redundant with the timerspecs
>>>> already present on DoFns. Generally I prefer less redundancy in the spec,
>>>> rather than have it be over-constrained. It's unclear what a runner that
>>>> didn't introspect the DoFn's TimerSpecs would do with this these special
>>>> edges, and also unclear how they would differ from possible self-loops due
>>>> to more traditional iteration.
>>>>
>>>> The primary motivation to express timers in this way seems to be the
>>>> desire to push them to workers using the data plan, rather than inventing
>>>> another mechanism or making them pull-based like with state. I think this
>>>> could be done by simply adding a Timer field to the Elements (or Data)
>>>> proto. (Note that this is not the same as having an hacky ElementOrTimer
>>>> elements flow through the graph.) Writes would be state requests, and
>>>> perhaps it would even make sense to "read" the current value of an unfired
>>>> timer over the state API, to be able to set things like
>>>> {min,max}(new_timestamp,old_timestamp}.
>>>>
>>>> (We could alternatively attempt to model State(s) as a PCollection(s),
>>>> but this is more speculative and would likely exacerbate some of the issues
>>>> above (though it could open the door for DoFns that somehow *share* state).
>>>> They seem like different objects though, one is a mutable store, the other
>>>> an immutable stream.)
>>>>
>>>> I realize this is a big shift, but we could probably adapt the existing
>>>> Python/Java implementations fairly easily (and it would probably simplify
>>>> them). And it's easier to do simplifications like this sooner rather than
>>>> later.
>>>>
>>>> What do people think about this? Any obvious (or not-so-obvious)
>>>> downsides that I'm missing?
>>>>
>>>> - Robert
>>>>
>>>>

Reply via email to