As I mentioned in my previous mail, I think that OperatorState would need be replaced by more specific types of state (ValueState, ListState, …).
> On 14 Dec 2015, at 11:34, Maximilian Michels <m...@apache.org> wrote: > >> >> On a side not, why would you call it KvState? And what would be called >> KvState? > > > The OperatorState interface would be called KvState. > > > On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Yes, as Kostas said, it would initially nor provide more functionality but >> it would enable us to add it later. >> >> On a side not, why would you call it KvState? And what would be called >> KvState? >> >>> On 14 Dec 2015, at 11:14, Kostas Tzoumas <ktzou...@apache.org> wrote: >>> >>> I suppose that they can start as sugar and evolve to a different >>> implementation. >>> >>> I would +1 the name change to KVState, OperatorState is indeed somewhat >>> confusing, and it will only get harder to rename later. >>> >>> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gyula.f...@gmail.com> >> wrote: >>> >>>> Would the Reducing/Folding states just be some API sugar on top of what >> we >>>> have know (ValueState) or does it have some added functionality (like >>>> incremental checkpoints for list states)? >>>> >>>> Gyula >>>> >>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. dec. >> 14., >>>> H, 11:03): >>>> >>>>> While enhancing the state interfaces we would also need to introduce >> new >>>>> types of state. I was thinking of these, for a start: >>>>> - ValueState (works like OperatorState works now, i.e. provides methods >>>>> to get/set one state value >>>>> - ListState, proves methods to add one element to a list of elements >> and >>>>> to iterate over all contained elements >>>>> - ReducingState, somewhat similar to value state but combines the added >>>>> value to the existing value using a ReduceFunction >>>>> - FoldingState, same as above but with fold >>>>> >>>>> I think these are necessary to give the system more knowledge about the >>>>> semantics of state so that it can handle the state more efficiently. >>>> Think >>>>> of incremental checkpoints, for example, these are easy to do if you >> know >>>>> that state is a list to which stuff is only appended. >>>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote: >>>>>> >>>>>> A lot of this makes sense, but I am not sure about renaming >>>>>> "OperatorState". The other name is nicer, but why make users' life >> hard >>>>>> just for a name? >>>>>> >>>>>> >>>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <m...@apache.org> >>>>> wrote: >>>>>> >>>>>>> Hi Aljoscha, >>>>>>> >>>>>>> Thanks for the informative technical description. >>>>>>> >>>>>>>> - function state: this is the state that you get when a user >> function >>>>>>> implements the Checkpointed interface. it is not partitioned >>>>>>>> - operator state: This is the state that a StreamOperator can >>>> snapshot, >>>>>>> it is similar to the function state, but for operators. it is not >>>>>>> partitioned >>>>>>>> - partitioned state: state that is scoped to the key of the incoming >>>>>>> element, in Flink, this is (confusingly) called OperatorState and >>>>> KvState >>>>>>> (internally) >>>>>>> >>>>>>> Let's clean that up! Let's rename the OperatorState interface to >>>>> KvState. >>>>>>> >>>>>>>> Both stream operators and user functions can have partitioned state, >>>>> and >>>>>>> the namespace is the same, i.e. the state can clash. The partitioned >>>>> state >>>>>>> will stay indefinitely if not manually cleared. >>>>>>> >>>>>>> I suppose operators currently have to take care to use a unique >>>>>>> identifier for the state such that it doesn't clash with the user >>>>>>> function. Wouldn't be too hard to introduce a scoping here. >>>>>>> >>>>>>> Your proposal makes sense. It seems like this is a rather delicate >>>>>>> change which improves the flexibility of the streaming API. What is >>>>>>> the motivation behind this? I suppose you are thinking of >> improvements >>>>>>> to the session capabilities of the streaming API. >>>>>>> >>>>>>>> If we want to also implement the current WindowOperator on top of >>>> these >>>>>>> generic facilities we need to have a way to scope state not only by >>>> key >>>>> but >>>>>>> also by windows (or better, some generic state scope). >>>>>>> >>>>>>> This is currently handled by the WindowOperator itself and would then >>>>>>> be delegated to the enhanced state interface? Makes sense if we want >>>>>>> to make use of the new state interface. Again, is it just cleaner or >>>>>>> does this enable new type of applications? >>>>>>> >>>>>>> Cheers, >>>>>>> Max >>>>>>> >>>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek < >>>> aljos...@apache.org> >>>>>>> wrote: >>>>>>>> Hi All, >>>>>>>> I want to discuss some ideas about improving the >>>> primitives/operations >>>>>>> that Flink offers for user-state, timers and windows and how these >>>>> concepts >>>>>>> can be unified. >>>>>>>> >>>>>>>> It has come up a lot lately that people have very specific >>>> requirements >>>>>>> regarding the state that they keep and it seems necessary to allows >>>>> users >>>>>>> to set their own custom timers (on processing time and watermark time >>>>>>> (event-time)) to do both expiration of state and implementation of >>>>> custom >>>>>>> windowing semantics. While we’re at this, we might also think about >>>>>>> cleaning up the state handling a bit. >>>>>>>> >>>>>>>> Let me first describe the status quo, so that we’re all on the same >>>>>>> page. There are three types of state: >>>>>>>> - function state: this is the state that you get when a user >> function >>>>>>> implements the Checkpointed interface. it is not partitioned >>>>>>>> - operator state: This is the state that a StreamOperator can >>>> snapshot, >>>>>>> it is similar to the function state, but for operators. it is not >>>>>>> partitioned >>>>>>>> - partitioned state: state that is scoped to the key of the incoming >>>>>>> element, in Flink, this is (confusingly) called OperatorState and >>>>> KvState >>>>>>> (internally) >>>>>>>> >>>>>>>> (Operator is the low-level concept, user functions are usually >>>> invoked >>>>>>> by the operator, for example StreamMap is the operator that handles a >>>>>>> MapFunction.) >>>>>>>> >>>>>>>> Function state and operator state is not partitioned, meaning that >> it >>>>>>> becomes difficult when we want to implement dynamic >>>> scale-in/scale-out. >>>>>>> With partitioned state it can be redistributed when changing the >>>> degree >>>>> of >>>>>>> parallelism. >>>>>>>> >>>>>>>> Both stream operators and user functions can have partitioned state, >>>>> and >>>>>>> the namespace is the same, i.e. the state can clash. The partitioned >>>>> state >>>>>>> will stay indefinitely if not manually cleared. >>>>>>>> >>>>>>>> On to timers, operators can register processing-time callbacks, they >>>>> can >>>>>>> react to watermarks to implement event-time callbacks. They have to >>>>>>> implement the logic themselves, however. For example, the >>>> WindowOperator >>>>>>> has custom code to keep track of watermark timers and for reacting to >>>>>>> watermarks. User functions have no way of registering timers. Also, >>>>> timers >>>>>>> are not scoped to any key. So if you register a timer while >> processing >>>>> an >>>>>>> element of a certain key, when the timer fires you don’t know what >> key >>>>> was >>>>>>> active when registering the timer. This might be necessary for >>>> cleaning >>>>> up >>>>>>> state for certain keys, or to trigger processing for a certain key >>>> only, >>>>>>> for example with session windows of some kind. >>>>>>>> >>>>>>>> Now, on to new stuff. I propose to add a timer facility that can be >>>>> used >>>>>>> by both operators and user functions. Both partitioned state and >>>> timers >>>>>>> should be aware of keys and if a timer fires the partitioned state >>>>> should >>>>>>> be scoped to the same key that was active when the timer was >>>> registered. >>>>>>>> >>>>>>>> One last bit. If we want to also implement the current >> WindowOperator >>>>> on >>>>>>> top of these generic facilities we need to have a way to scope state >>>> not >>>>>>> only by key but also by windows (or better, some generic state >> scope). >>>>> The >>>>>>> reason is, that one key can have several active windows at one point >>>> in >>>>>>> time and firing timers need to me mapped to the correct window (for >>>>>>> example, for sliding windows, or session windows or what have you…). >>>>>>>> >>>>>>>> Happy discussing. :D >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>> >> >>