[ https://issues.apache.org/jira/browse/FLINK-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15123556#comment-15123556 ]
ASF GitHub Bot commented on FLINK-3201: --------------------------------------- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1562 Enhance Partitioned State and use it in WindowOperator The commits in this are self-contained. The first one enhances the partitioned state such that it can be used by the WindowOperator. The second commit adds the required changes in WindowOperator. The third commit adds a State Backend based on RocksDB. This means that window operator can not run on different types of state backends transparently. @StephanEwen has some things that he wants to change, those will be added here (I think). I'm opening this now so that people can have a look at it. ## [FLINK-3201] Enhance Partitioned State Interface with State Types Add new state types ValueState, ListState and ReducingState, where ListState and ReducingState derive from interface MergingState. ValueState behaves exactly the same as OperatorState. MergingState is a stateful list to which elements can be added and for which the elements that it contains can be obtained. If using a ListState the list of elements is actually kept, for a ReducingState a reduce function is used to combine all added elements into one. To create a ValueState the user passes a ValueStateIdentifier to StreamingRuntimeContext.getPartitionedState() while they would pass a ListStateIdentifier or ReducingStateIdentifier for the other state types. This change is necessary to give the system more information about the nature of the operator state. We want this to be able to do incremental snapshots. This would not be possible, for example, if the user had a List as a state. Inside OperatorState this list would be opaque and Flink could not create good incremental snapshots. This also refactors the StateBackend. Before, the logic for partitioned state was spread out over StreamingRuntimeContext, AbstractStreamOperator and StateBackend. Now it is consolidated in StateBackend. This also adds support for partitioned state in two-input operators. ## [FLINK-3200] Use Partitioned State in WindowOperator This changes window operator to use the new partitioned state abstraction for keeping window contents instead of custom internal state and the checkpointed interface. For now, timers are still kept as custom checkpointed state, however. WindowOperator now expects a StateIdentifier for MergingState, this can either be for ReducingState or ListState but WindowOperator is agnostic to the type of State. Also the signature of WindowFunction is changed to include the type of intermediate input. For example, if a ReducingState is used the input of the WindowFunction is T (where T is the input type). If using a ListState the input of the WindowFunction would be of type Iterable[T]. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-on-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1562 ---- commit dc440ba4d97cecd297e432f14342dba5382cab50 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-25T11:33:51Z [FLINK-3201] Enhance Partitioned State Interface with State Types Add new state types ValueState, ListState and ReducingState, where ListState and ReducingState derive from interface MergingState. ValueState behaves exactly the same as OperatorState. MergingState is a stateful list to which elements can be added and for which the elements that it contains can be obtained. If using a ListState the list of elements is actually kept, for a ReducingState a reduce function is used to combine all added elements into one. To create a ValueState the user passes a ValueStateIdentifier to StreamingRuntimeContext.getPartitionedState() while they would pass a ListStateIdentifier or ReducingStateIdentifier for the other state types. This change is necessary to give the system more information about the nature of the operator state. We want this to be able to do incremental snapshots. This would not be possible, for example, if the user had a List as a state. Inside OperatorState this list would be opaque and Flink could not create good incremental snapshots. This also refactors the StateBackend. Before, the logic for partitioned state was spread out over StreamingRuntimeContext, AbstractStreamOperator and StateBackend. Now it is consolidated in StateBackend. This also adds support for partitioned state in two-input operators. commit 865723e3ff0133ba9d921907c298c3545fdfe32c Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-25T11:34:05Z [FLINK-3200] Use Partitioned State in WindowOperator This changes window operator to use the new partitioned state abstraction for keeping window contents instead of custom internal state and the checkpointed interface. For now, timers are still kept as custom checkpointed state, however. WindowOperator now expects a StateIdentifier for MergingState, this can either be for ReducingState or ListState but WindowOperator is agnostic to the type of State. Also the signature of WindowFunction is changed to include the type of intermediate input. For example, if a ReducingState is used the input of the WindowFunction is T (where T is the input type). If using a ListState the input of the WindowFunction would be of type Iterable[T]. commit 1034a02380652ef5184e0094a41ef073c7c9b4fd Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-21T09:56:47Z [FLINK-3278] Add Partitioned State Backend Based on RocksDB commit 13d1f541a48f82b1a10854addbdd5ea9bde7b079 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-28T17:18:26Z Move RocksDB backup to external processes ---- > Enhance Partitioned State Interface with State Types > ---------------------------------------------------- > > Key: FLINK-3201 > URL: https://issues.apache.org/jira/browse/FLINK-3201 > Project: Flink > Issue Type: Sub-task > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > We should enhance the partitioned state with different state types, so that > the system knows about the semantics of the State. I propose for now: > - ValueState, this behaves like the current OperatorState: state is one value > that can be set and retrieved > - ListState, state is a list that can be appended to and iterated over > - ReducingState, state is one value that other values can be added to > ListState and ReducingState would share a common superclass to allow them to > be used in the same places. For example, the WindowOperator would use > ReducingState and ListState interchangeably, depending on whether we have a > ReduceFunction or not. > These additions allow the system to be clever about how state is checkpointed > in the future. Think ManageMemory/Out-of-core state and incremental > checkpoints. > Also, state should be scoped to both a key and a namespace. This will allow > the WindowOperator to use the interface. Right now, WindowOperator has a > custom state implementation that uses a two-level Map (by key and by window). > In this case the window would be the namespace. -- This message was sent by Atlassian JIRA (v6.3.4#6332)