[ https://issues.apache.org/jira/browse/FLINK-4379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503567#comment-15503567 ]
ASF GitHub Bot commented on FLINK-4379: --------------------------------------- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2512 Partitionable op state This pull request introduces rescalable non-partitioned operator state as described in issue [FLINK-4379] and makes following major changes: # 1) Introducing interfaces `PartitionableStateBackend` and `SnapshotProvider` For rescaling purposes, non-partitioned state is now stored in a `PartitionableStateBackend`, which provides a method to register operator states under unique names and store state partitions in a list state: ``` <S> ListState<S> getPartitionableState( String name, Collection<PartitionableOperatorStateHandle> restoreSnapshots, TypeSerializer<S> partitionStateSerializer) throws Exception; ``` Notice that a `TypeSerializer` is provided on a per-state basis so that we can realize backward compatibility if the serialization format changes. Furthermore, we introduce `SnapshotProvider` which offers a method to create snapshots: ``` RunnableFuture<S> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception; ``` `DefaultPartitionableStateBackend` is an implementation of both, `PartitionableStateBackend` and `SnapshotProvider`, where the first interface gives a view that is exposed to user code and the later interface is only used by the system to trigger snapshots. Similar to other state backends (e.g. `KeyedStateBackend`), a `DefaultPartitionableStateBackend` can be created and restored through the class `AbstractStateBackend`: ``` public DefaultPartitionableStateBackend createPartitionableStateBackend( Environment env, String operatorIdentifier) public DefaultPartitionableStateBackend restorePartitionableStateBackend( Environment env, String operatorIdentifier, Collection<PartitionableOperatorStateHandle> restoreSnapshots) ``` # 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed` User code can use rescalable non-partitioned state by implementing the `PartitionableCheckpointed` interface. This interface is meant to replace the `Checkpointed` interface, which would become deprecated. ``` void storeOperatorState(long checkpointId, PartitionableStateBackend backend) throws Exception; void restoreOperatorState(PartitionableStateBackend backend) throws Exception; ``` Through the store/restore methods, user code can interact with `PartitionableStateBackend` and register/restore states as previously explained. Methods from this interface are invoked in `StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()` respectively. # 3) Interface `StateRepartitioner` This interface allows implementations of repartitioning strategies the redistribute non-partitioned state when for changing parallelism. Currently, there is only one default strategy implemented in `RoundRobinStatePartitioner`, but in general it is possible to implement different strategies, e.g. a `StatePartitioner` that distributes the union of all previous states parallel subtasks to each parallel subtask. This interface is used in `CheckpointCoordinator::restoreLatestCheckpointedState()`. TODO: After review, a description of the new features must still be added to the Flink documentation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink partitionable-op-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2512.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2512 ---- commit d133b948b8a201178e4ad3afa49e715bebf8eb5f Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-31T21:59:27Z State backend infrastructure to support partitionable op state. Introducing CheckpointStateHandles as container for all checkpoint related state handles tmp repartitioning function wip commit 7213938be4bd706b57128c49cb24b24450ea1c87 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-07T16:52:48Z Add task chain length to TaskState commit b639c8c62f64f840b95cd6191d7adf514eecddc6 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-08T21:11:11Z Forward partitionable operator state to PartitionableCheckpointed operators commit 090d27f6daa173b43f1c5d2ee19cc661359b925a Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-08T22:05:13Z Make Kafka sources re-partitionable by using the partitionable non-keyed state Allow Kafka sources to use repartitionable state Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase commit bc964d7ad6c1608d8bae0db67efdb70c4c278086 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-09T13:34:38Z Make Kafka producer re-partitionable by removing the Checkpointed interface commit 2366feeff842c3db39625da5a5912a98e83692db Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-09T17:02:32Z Fix HeapKeyedStateBackend.restorePartitionedState commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-15T16:42:55Z Partitionable State temporary WIP commit e84998d2702b9461f59df14c5aa1bb8d7438457a Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-16T12:22:47Z Introduce partitionable state in savepoint serializer commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-16T12:37:34Z Using known chain length to replace maps with list of known size commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-16T17:56:02Z Added test case for rescaling partitionable state commit 26b48fedc648715e0ed3e5a4642bede91c86ff02 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-19T10:16:26Z Code cleanup, minor refinements and documentation ---- > Add Rescalable Non-Partitioned State > ------------------------------------ > > Key: FLINK-4379 > URL: https://issues.apache.org/jira/browse/FLINK-4379 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Ufuk Celebi > Assignee: Stefan Richter > > This issue is associated with [FLIP-8| > https://cwiki.apache.org/confluence/display/FLINK/FLIP-8%3A+Rescalable+Non-Partitioned+State]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)