Short train of thoughts continues.

I do agree that something could change, I just don't believe that we are at
something better yet with what has been proposed so far.

On Wed, Sep 19, 2018 at 11:43 AM Lukasz Cwik <lc...@google.com> wrote:

> Clear can be modeled by a boolean state cell of process/ignore next timer
> firing. Not as good for watermark advancement though if you can eagerly
> clear something.
>
> Longer term you could retract a "timer" from a PCollection once
> retractions are supported.
>
> On Wed, Sep 19, 2018 at 11:40 AM Reuven Lax <re...@google.com> wrote:
>
>> I believe that clearTimer has been a feature request before though.
>>
>> On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> *How does modelling a timer as a PCollection help the Beam model?*
>>>
>>> The largest concern was about how to model timers within Apache Beam
>>> that:
>>> 1) removed the need for the watermark hold that is typically accompanied
>>> with state/timer implementations
>>> 2) enabled the ability to set the explicit output time to be independent
>>> of the firing time for all timer specifications [1]
>>>
>>> I felt as though treating timers as a self-loop around the ParDo
>>> PTransform allowed us to use the natural definition of output watermark =
>>> min(all input watermarks) as a way to define how timers hold output and
>>> using windowed values that contained timers as a natural way to represent
>>> the output time to be independent of the firing time. The purpose of the
>>> PCollection right now is to store the representation of how timers are
>>> encoded. I suspect that at some point in time we will have different timer
>>> encodings.
>>>
>>> Having this fit well with how timers are delivered between the SDK and
>>> Runner was an added bonus. Also, a good portion of the code that I needed
>>> to fix up was more related to the assumption that there was ever only a
>>> single input producer to an executable stage and plumbing of timer
>>> specifications through all the runner library support layers.
>>>
>>> ----------
>>> *There is no "clear" for timers.*
>>>
>>> The current Java API for timers only allows you to set them. Clearing
>>> timers is not exposed to users and is only used by internal implementations
>>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>>   @TimerId("timer")
>>>   private final TimerSpec timerSpec =
>>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>>   @ProcessElement
>>>   public void process(
>>>       ProcessContext context,
>>>       BoundedWindow window,
>>>       @TimerId("timer") Timer myTimer) {
>>>
>>>     myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>>   }
>>>
>>> ---------
>>> I'm not a big fan of having timers as a separate field in the elements
>>> proto. I still think they should be treated as an input/output and we could
>>> update the representation so that inputs/outputs for PTransforms don't need
>>> to be "PCollections". I was thinking that our current PCollection
>>> representation assumes that we'll never want to change it to add extra
>>> information or do backwards incompatible changes like beam:pcollection:v2.
>>>
>>> ---------
>>> Other points:
>>> * side inputs already require a runner to introspect the ParDo payload
>>> to get the SideInputSpec, requiring it to have knowledge of the TimerSpec
>>> is no different.
>>> * multimap side input over timers where the key is the key that the
>>> timer is associated with. iterable side input over timers would allow you
>>> to iterate over <key, timer> pairs. This could be useful for skew control
>>> in sources since they would want to know how far they are ahead vs other
>>> restrictions.
>>> * user state as a PCollection can make sense but I can't see how we can
>>> get past problems when we treat it as an "input" since the input watermark
>>> would be ignored or infinity?. I do agree that this could open the door to
>>> sharing "state" such as multi-key transactions but very speculative as you
>>> say.
>>>
>>>
>>> 1: https://issues.apache.org/jira/browse/BEAM-2535
>>> 2:
>>> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>>>
>>> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Robert,
>>>>
>>>> Thanks for presenting these thoughts. Your attempt to implement the
>>>> timer support in the Python runner is the first strong signal we have and
>>>> it is the right time to make changes - AFAIK no other runner work has been
>>>> done.
>>>>
>>>> I'm also a bit concerned about the acrobatics required in the PR to
>>>> make this work. Luke will be in the best position to comment, but as I
>>>> recall we considered modeling timers as special PCollections a
>>>> simplification for SDK <> Runner interaction and overall implementation.
>>>> The special treatment (and slight confusion) at the graph level perhaps was
>>>> an early warning sign, discovering the extra complexity wiring this in a
>>>> runner should be a reason to revisit.
>>>>
>>>> Conceptually timers are special state, they are certainly more state
>>>> than stream :) Regardless how they are passed to the harness, the runner
>>>> will need to treat them similar to side inputs and user state.
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> TLDR Perhaps we should revisit
>>>>> https://s.apache.org/beam-portability-timers in light of the fact
>>>>> that Timers are more like State than PCollections.
>>>>>
>>>>> --
>>>>>
>>>>> While looking at implementing State and Timers in the Python SDK, I've
>>>>> been revisiting the ideas presented at
>>>>> https://s.apache.org/beam-portability-timers , and am now starting to
>>>>> wonder if this is actually the best way to model things (at least at the
>>>>> Runner level). Instead it seems Timers are more resemble, and are tightly
>>>>> bound to, State than PCollections.
>>>>>
>>>>> This is especially clear when writing timers. These timers are not a
>>>>> bag of emitted elements, rather one sets (and clears) timers and the set 
>>>>> of
>>>>> timers that end up firing are a result of this *ordered* sequence of
>>>>> operations. It is also often important that the setting of timers be
>>>>> ordered with respect to the setting and clearing of state itself (and is
>>>>> more often than not collocated with such requests).
>>>>>
>>>>> In addition, these self-loops add complexity to the graph but provide
>>>>> no additional information--they are entirely redundant with the timerspecs
>>>>> already present on DoFns. Generally I prefer less redundancy in the spec,
>>>>> rather than have it be over-constrained. It's unclear what a runner that
>>>>> didn't introspect the DoFn's TimerSpecs would do with this these special
>>>>> edges, and also unclear how they would differ from possible self-loops due
>>>>> to more traditional iteration.
>>>>>
>>>>> The primary motivation to express timers in this way seems to be the
>>>>> desire to push them to workers using the data plan, rather than inventing
>>>>> another mechanism or making them pull-based like with state. I think this
>>>>> could be done by simply adding a Timer field to the Elements (or Data)
>>>>> proto. (Note that this is not the same as having an hacky ElementOrTimer
>>>>> elements flow through the graph.) Writes would be state requests, and
>>>>> perhaps it would even make sense to "read" the current value of an unfired
>>>>> timer over the state API, to be able to set things like
>>>>> {min,max}(new_timestamp,old_timestamp}.
>>>>>
>>>>> (We could alternatively attempt to model State(s) as a PCollection(s),
>>>>> but this is more speculative and would likely exacerbate some of the 
>>>>> issues
>>>>> above (though it could open the door for DoFns that somehow *share* 
>>>>> state).
>>>>> They seem like different objects though, one is a mutable store, the other
>>>>> an immutable stream.)
>>>>>
>>>>> I realize this is a big shift, but we could probably adapt the
>>>>> existing Python/Java implementations fairly easily (and it would probably
>>>>> simplify them). And it's easier to do simplifications like this sooner
>>>>> rather than later.
>>>>>
>>>>> What do people think about this? Any obvious (or not-so-obvious)
>>>>> downsides that I'm missing?
>>>>>
>>>>> - Robert
>>>>>
>>>>>

Reply via email to