GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2512
Partitionable op state
This pull request introduces rescalable non-partitioned operator state as
described in issue [FLINK-4379] and makes following major changes:
# 1) Introducing interfaces `PartitionableStateBackend` and
`SnapshotProvider`
For rescaling purposes, non-partitioned state is now stored in a
`PartitionableStateBackend`, which provides a method to register operator
states under unique names and store state partitions in a list state:
```
<S> ListState<S> getPartitionableState(
String name,
Collection<PartitionableOperatorStateHandle> restoreSnapshots,
TypeSerializer<S> partitionStateSerializer) throws Exception;
```
Notice that a `TypeSerializer` is provided on a per-state basis so that we
can realize backward compatibility if the serialization format changes.
Furthermore, we introduce `SnapshotProvider` which offers a method to
create snapshots:
```
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory) throws Exception;
```
`DefaultPartitionableStateBackend` is an implementation of both,
`PartitionableStateBackend` and `SnapshotProvider`, where the first interface
gives a view that is exposed to user code and the later interface is only used
by the system to trigger snapshots.
Similar to other state backends (e.g. `KeyedStateBackend`), a
`DefaultPartitionableStateBackend` can be created and restored through the
class `AbstractStateBackend`:
```
public DefaultPartitionableStateBackend createPartitionableStateBackend(
Environment env,
String operatorIdentifier)
public DefaultPartitionableStateBackend restorePartitionableStateBackend(
Environment env,
String operatorIdentifier,
Collection<PartitionableOperatorStateHandle> restoreSnapshots)
```
# 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed`
User code can use rescalable non-partitioned state by implementing the
`PartitionableCheckpointed` interface. This interface is meant to replace the
`Checkpointed` interface, which would become deprecated.
```
void storeOperatorState(long checkpointId, PartitionableStateBackend
backend) throws Exception;
void restoreOperatorState(PartitionableStateBackend backend) throws
Exception;
```
Through the store/restore methods, user code can interact with
`PartitionableStateBackend` and register/restore states as previously explained.
Methods from this interface are invoked in
`StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()`
respectively.
# 3) Interface `StateRepartitioner`
This interface allows implementations of repartitioning strategies the
redistribute non-partitioned state when for changing parallelism. Currently,
there is only one default strategy implemented in `RoundRobinStatePartitioner`,
but in general it is possible to implement different strategies, e.g. a
`StatePartitioner` that distributes the union of all previous states parallel
subtasks to each parallel subtask. This interface is used in
`CheckpointCoordinator::restoreLatestCheckpointedState()`.
TODO: After review, a description of the new features must still be added
to the Flink documentation.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink partitionable-op-state
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2512.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2512
----
commit d133b948b8a201178e4ad3afa49e715bebf8eb5f
Author: Stefan Richter <[email protected]>
Date: 2016-08-31T21:59:27Z
State backend infrastructure to support partitionable op state.
Introducing CheckpointStateHandles as container for all checkpoint related
state handles
tmp repartitioning function wip
commit 7213938be4bd706b57128c49cb24b24450ea1c87
Author: Till Rohrmann <[email protected]>
Date: 2016-09-07T16:52:48Z
Add task chain length to TaskState
commit b639c8c62f64f840b95cd6191d7adf514eecddc6
Author: Till Rohrmann <[email protected]>
Date: 2016-09-08T21:11:11Z
Forward partitionable operator state to PartitionableCheckpointed operators
commit 090d27f6daa173b43f1c5d2ee19cc661359b925a
Author: Till Rohrmann <[email protected]>
Date: 2016-09-08T22:05:13Z
Make Kafka sources re-partitionable by using the partitionable non-keyed
state
Allow Kafka sources to use repartitionable state
Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase
commit bc964d7ad6c1608d8bae0db67efdb70c4c278086
Author: Till Rohrmann <[email protected]>
Date: 2016-09-09T13:34:38Z
Make Kafka producer re-partitionable by removing the Checkpointed interface
commit 2366feeff842c3db39625da5a5912a98e83692db
Author: Till Rohrmann <[email protected]>
Date: 2016-09-09T17:02:32Z
Fix HeapKeyedStateBackend.restorePartitionedState
commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd
Author: Stefan Richter <[email protected]>
Date: 2016-09-15T16:42:55Z
Partitionable State temporary WIP
commit e84998d2702b9461f59df14c5aa1bb8d7438457a
Author: Stefan Richter <[email protected]>
Date: 2016-09-16T12:22:47Z
Introduce partitionable state in savepoint serializer
commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad
Author: Stefan Richter <[email protected]>
Date: 2016-09-16T12:37:34Z
Using known chain length to replace maps with list of known size
commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1
Author: Stefan Richter <[email protected]>
Date: 2016-09-16T17:56:02Z
Added test case for rescaling partitionable state
commit 26b48fedc648715e0ed3e5a4642bede91c86ff02
Author: Stefan Richter <[email protected]>
Date: 2016-09-19T10:16:26Z
Code cleanup, minor refinements and documentation
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---