TLDR Perhaps we should revisit https://s.apache.org/beam-portability-timers
in light of the fact that Timers are more like State than PCollections.

--

While looking at implementing State and Timers in the Python SDK, I've been
revisiting the ideas presented at
https://s.apache.org/beam-portability-timers , and am now starting to
wonder if this is actually the best way to model things (at least at the
Runner level). Instead it seems Timers are more resemble, and are tightly
bound to, State than PCollections.

This is especially clear when writing timers. These timers are not a bag of
emitted elements, rather one sets (and clears) timers and the set of timers
that end up firing are a result of this *ordered* sequence of operations.
It is also often important that the setting of timers be ordered with
respect to the setting and clearing of state itself (and is more often than
not collocated with such requests).

In addition, these self-loops add complexity to the graph but provide no
additional information--they are entirely redundant with the timerspecs
already present on DoFns. Generally I prefer less redundancy in the spec,
rather than have it be over-constrained. It's unclear what a runner that
didn't introspect the DoFn's TimerSpecs would do with this these special
edges, and also unclear how they would differ from possible self-loops due
to more traditional iteration.

The primary motivation to express timers in this way seems to be the desire
to push them to workers using the data plan, rather than inventing another
mechanism or making them pull-based like with state. I think this could be
done by simply adding a Timer field to the Elements (or Data) proto. (Note
that this is not the same as having an hacky ElementOrTimer elements flow
through the graph.) Writes would be state requests, and perhaps it would
even make sense to "read" the current value of an unfired timer over the
state API, to be able to set things like
{min,max}(new_timestamp,old_timestamp}.

(We could alternatively attempt to model State(s) as a PCollection(s), but
this is more speculative and would likely exacerbate some of the issues
above (though it could open the door for DoFns that somehow *share* state).
They seem like different objects though, one is a mutable store, the other
an immutable stream.)

I realize this is a big shift, but we could probably adapt the existing
Python/Java implementations fairly easily (and it would probably simplify
them). And it's easier to do simplifications like this sooner rather than
later.

What do people think about this? Any obvious (or not-so-obvious) downsides
that I'm missing?

- Robert

Reply via email to