Hi All,
I want to discuss some ideas about improving the primitives/operations that 
Flink offers for user-state, timers and windows and how these concepts can be 
unified.

It has come up a lot lately that people have very specific requirements 
regarding the state that they keep and it seems necessary to allows users to 
set their own custom timers (on processing time and watermark time 
(event-time)) to do both expiration of state and implementation of custom 
windowing semantics. While we’re at this, we might also think about cleaning up 
the state handling a bit.

Let me first describe the status quo, so that we’re all on the same page. There 
are three types of state:
 - function state: this is the state that you get when a user function 
implements the Checkpointed interface. it is not partitioned
 - operator state: This is the state that a StreamOperator can snapshot, it is 
similar to the function state, but for operators. it is not partitioned
 - partitioned state: state that is scoped to the key of the incoming element, 
in Flink, this is (confusingly) called OperatorState and KvState (internally)

(Operator is the low-level concept, user functions are usually invoked by the 
operator, for example StreamMap is the operator that handles a MapFunction.)

Function state and operator state is not partitioned, meaning that it becomes 
difficult when we want to implement dynamic scale-in/scale-out. With 
partitioned state it can be redistributed when changing the degree of 
parallelism.

Both stream operators and user functions can have partitioned state, and the 
namespace is the same, i.e. the state can clash. The partitioned state will 
stay indefinitely if not manually cleared.

On to timers, operators can register processing-time callbacks, they can react 
to watermarks to implement event-time callbacks. They have to implement the 
logic themselves, however. For example, the WindowOperator has custom code to 
keep track of watermark timers and for reacting to watermarks. User functions 
have no way of registering timers. Also, timers are not scoped to any key. So 
if you register a timer while processing an element of a certain key, when the 
timer fires you don’t know what key was active when registering the timer. This 
might be necessary for cleaning up state for certain keys, or to trigger 
processing for a certain key only, for example with session windows of some 
kind.

Now, on to new stuff. I propose to add a timer facility that can be used by 
both operators and user functions. Both partitioned state and timers should be 
aware of keys and if a timer fires the partitioned state should be scoped to 
the same key that was active when the timer was registered.

One last bit. If we want to also implement the current WindowOperator on top of 
these generic facilities we need to have a way to scope state not only by key 
but also by windows (or better, some generic state scope). The reason is, that 
one key can have several active windows at one point in time and firing timers 
need to me mapped to the correct window (for example, for sliding windows, or 
session windows or what have you…).

Happy discussing. :D

Cheers,
Aljoscha


Reply via email to