Would the Reducing/Folding states just be some API sugar on top of what we have know (ValueState) or does it have some added functionality (like incremental checkpoints for list states)?
Gyula Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. dec. 14., H, 11:03): > While enhancing the state interfaces we would also need to introduce new > types of state. I was thinking of these, for a start: > - ValueState (works like OperatorState works now, i.e. provides methods > to get/set one state value > - ListState, proves methods to add one element to a list of elements and > to iterate over all contained elements > - ReducingState, somewhat similar to value state but combines the added > value to the existing value using a ReduceFunction > - FoldingState, same as above but with fold > > I think these are necessary to give the system more knowledge about the > semantics of state so that it can handle the state more efficiently. Think > of incremental checkpoints, for example, these are easy to do if you know > that state is a list to which stuff is only appended. > > On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote: > > > > A lot of this makes sense, but I am not sure about renaming > > "OperatorState". The other name is nicer, but why make users' life hard > > just for a name? > > > > > > On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <m...@apache.org> > wrote: > > > >> Hi Aljoscha, > >> > >> Thanks for the informative technical description. > >> > >>> - function state: this is the state that you get when a user function > >> implements the Checkpointed interface. it is not partitioned > >>> - operator state: This is the state that a StreamOperator can snapshot, > >> it is similar to the function state, but for operators. it is not > >> partitioned > >>> - partitioned state: state that is scoped to the key of the incoming > >> element, in Flink, this is (confusingly) called OperatorState and > KvState > >> (internally) > >> > >> Let's clean that up! Let's rename the OperatorState interface to > KvState. > >> > >>> Both stream operators and user functions can have partitioned state, > and > >> the namespace is the same, i.e. the state can clash. The partitioned > state > >> will stay indefinitely if not manually cleared. > >> > >> I suppose operators currently have to take care to use a unique > >> identifier for the state such that it doesn't clash with the user > >> function. Wouldn't be too hard to introduce a scoping here. > >> > >> Your proposal makes sense. It seems like this is a rather delicate > >> change which improves the flexibility of the streaming API. What is > >> the motivation behind this? I suppose you are thinking of improvements > >> to the session capabilities of the streaming API. > >> > >>> If we want to also implement the current WindowOperator on top of these > >> generic facilities we need to have a way to scope state not only by key > but > >> also by windows (or better, some generic state scope). > >> > >> This is currently handled by the WindowOperator itself and would then > >> be delegated to the enhanced state interface? Makes sense if we want > >> to make use of the new state interface. Again, is it just cleaner or > >> does this enable new type of applications? > >> > >> Cheers, > >> Max > >> > >> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <aljos...@apache.org> > >> wrote: > >>> Hi All, > >>> I want to discuss some ideas about improving the primitives/operations > >> that Flink offers for user-state, timers and windows and how these > concepts > >> can be unified. > >>> > >>> It has come up a lot lately that people have very specific requirements > >> regarding the state that they keep and it seems necessary to allows > users > >> to set their own custom timers (on processing time and watermark time > >> (event-time)) to do both expiration of state and implementation of > custom > >> windowing semantics. While we’re at this, we might also think about > >> cleaning up the state handling a bit. > >>> > >>> Let me first describe the status quo, so that we’re all on the same > >> page. There are three types of state: > >>> - function state: this is the state that you get when a user function > >> implements the Checkpointed interface. it is not partitioned > >>> - operator state: This is the state that a StreamOperator can snapshot, > >> it is similar to the function state, but for operators. it is not > >> partitioned > >>> - partitioned state: state that is scoped to the key of the incoming > >> element, in Flink, this is (confusingly) called OperatorState and > KvState > >> (internally) > >>> > >>> (Operator is the low-level concept, user functions are usually invoked > >> by the operator, for example StreamMap is the operator that handles a > >> MapFunction.) > >>> > >>> Function state and operator state is not partitioned, meaning that it > >> becomes difficult when we want to implement dynamic scale-in/scale-out. > >> With partitioned state it can be redistributed when changing the degree > of > >> parallelism. > >>> > >>> Both stream operators and user functions can have partitioned state, > and > >> the namespace is the same, i.e. the state can clash. The partitioned > state > >> will stay indefinitely if not manually cleared. > >>> > >>> On to timers, operators can register processing-time callbacks, they > can > >> react to watermarks to implement event-time callbacks. They have to > >> implement the logic themselves, however. For example, the WindowOperator > >> has custom code to keep track of watermark timers and for reacting to > >> watermarks. User functions have no way of registering timers. Also, > timers > >> are not scoped to any key. So if you register a timer while processing > an > >> element of a certain key, when the timer fires you don’t know what key > was > >> active when registering the timer. This might be necessary for cleaning > up > >> state for certain keys, or to trigger processing for a certain key only, > >> for example with session windows of some kind. > >>> > >>> Now, on to new stuff. I propose to add a timer facility that can be > used > >> by both operators and user functions. Both partitioned state and timers > >> should be aware of keys and if a timer fires the partitioned state > should > >> be scoped to the same key that was active when the timer was registered. > >>> > >>> One last bit. If we want to also implement the current WindowOperator > on > >> top of these generic facilities we need to have a way to scope state not > >> only by key but also by windows (or better, some generic state scope). > The > >> reason is, that one key can have several active windows at one point in > >> time and firing timers need to me mapped to the correct window (for > >> example, for sliding windows, or session windows or what have you…). > >>> > >>> Happy discussing. :D > >>> > >>> Cheers, > >>> Aljoscha > >>> > >>> > >> > >