Hi All, I want to discuss some ideas about improving the primitives/operations that Flink offers for user-state, timers and windows and how these concepts can be unified.
It has come up a lot lately that people have very specific requirements regarding the state that they keep and it seems necessary to allows users to set their own custom timers (on processing time and watermark time (event-time)) to do both expiration of state and implementation of custom windowing semantics. While we’re at this, we might also think about cleaning up the state handling a bit. Let me first describe the status quo, so that we’re all on the same page. There are three types of state: - function state: this is the state that you get when a user function implements the Checkpointed interface. it is not partitioned - operator state: This is the state that a StreamOperator can snapshot, it is similar to the function state, but for operators. it is not partitioned - partitioned state: state that is scoped to the key of the incoming element, in Flink, this is (confusingly) called OperatorState and KvState (internally) (Operator is the low-level concept, user functions are usually invoked by the operator, for example StreamMap is the operator that handles a MapFunction.) Function state and operator state is not partitioned, meaning that it becomes difficult when we want to implement dynamic scale-in/scale-out. With partitioned state it can be redistributed when changing the degree of parallelism. Both stream operators and user functions can have partitioned state, and the namespace is the same, i.e. the state can clash. The partitioned state will stay indefinitely if not manually cleared. On to timers, operators can register processing-time callbacks, they can react to watermarks to implement event-time callbacks. They have to implement the logic themselves, however. For example, the WindowOperator has custom code to keep track of watermark timers and for reacting to watermarks. User functions have no way of registering timers. Also, timers are not scoped to any key. So if you register a timer while processing an element of a certain key, when the timer fires you don’t know what key was active when registering the timer. This might be necessary for cleaning up state for certain keys, or to trigger processing for a certain key only, for example with session windows of some kind. Now, on to new stuff. I propose to add a timer facility that can be used by both operators and user functions. Both partitioned state and timers should be aware of keys and if a timer fires the partitioned state should be scoped to the same key that was active when the timer was registered. One last bit. If we want to also implement the current WindowOperator on top of these generic facilities we need to have a way to scope state not only by key but also by windows (or better, some generic state scope). The reason is, that one key can have several active windows at one point in time and firing timers need to me mapped to the correct window (for example, for sliding windows, or session windows or what have you…). Happy discussing. :D Cheers, Aljoscha