GitHub user StefanRRichter opened a pull request:

    Partitionable op state

    This pull request introduces rescalable non-partitioned operator state as 
described in issue [FLINK-4379] and makes following major changes:
    # 1) Introducing interfaces `PartitionableStateBackend` and 
    For rescaling purposes, non-partitioned state is now stored in a 
`PartitionableStateBackend`, which provides a method to register operator 
states under unique names and store state partitions in a list state:
    <S> ListState<S> getPartitionableState(
        String name,
        Collection<PartitionableOperatorStateHandle> restoreSnapshots,
        TypeSerializer<S> partitionStateSerializer) throws Exception;
    Notice that a `TypeSerializer` is provided on a per-state basis so that we 
can realize backward compatibility if the serialization format changes.
    Furthermore, we introduce `SnapshotProvider` which offers a method to 
create snapshots:
    RunnableFuture<S> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory) throws Exception;
    `DefaultPartitionableStateBackend` is an implementation of both, 
`PartitionableStateBackend` and `SnapshotProvider`, where the first interface 
gives a view that is exposed to user code and the later interface is only used 
by the system to trigger snapshots.
    Similar to other state backends (e.g. `KeyedStateBackend`), a 
`DefaultPartitionableStateBackend` can be created and restored through the 
class `AbstractStateBackend`:
    public DefaultPartitionableStateBackend createPartitionableStateBackend(
        Environment env,
        String operatorIdentifier)
    public DefaultPartitionableStateBackend restorePartitionableStateBackend(
        Environment env,
        String operatorIdentifier,
        Collection<PartitionableOperatorStateHandle> restoreSnapshots)
    # 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed`
    User code can use rescalable non-partitioned state by implementing the 
`PartitionableCheckpointed` interface. This interface is meant to replace the 
`Checkpointed` interface, which would become deprecated.
    void storeOperatorState(long checkpointId, PartitionableStateBackend 
backend) throws Exception;
    void restoreOperatorState(PartitionableStateBackend backend) throws 
    Through the store/restore methods, user code can interact with 
`PartitionableStateBackend` and register/restore states as previously explained.
    Methods from this interface are invoked in 
`StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()` 
    # 3) Interface `StateRepartitioner`
    This interface allows implementations of repartitioning strategies the 
redistribute non-partitioned state when for changing parallelism. Currently, 
there is only one default strategy implemented in `RoundRobinStatePartitioner`, 
but in general it is possible to implement different strategies, e.g. a 
`StatePartitioner` that distributes the union of all previous states parallel 
subtasks to each parallel subtask. This interface is used in 
    TODO: After review, a description of the new features must still be added 
to the Flink documentation.

You can merge this pull request into a Git repository by running:

    $ git pull partitionable-op-state

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2512
commit d133b948b8a201178e4ad3afa49e715bebf8eb5f
Author: Stefan Richter <>
Date:   2016-08-31T21:59:27Z

    State backend infrastructure to support partitionable op state.
    Introducing CheckpointStateHandles as container for all checkpoint related 
state handles
    tmp repartitioning function wip

commit 7213938be4bd706b57128c49cb24b24450ea1c87
Author: Till Rohrmann <>
Date:   2016-09-07T16:52:48Z

    Add task chain length to TaskState

commit b639c8c62f64f840b95cd6191d7adf514eecddc6
Author: Till Rohrmann <>
Date:   2016-09-08T21:11:11Z

    Forward partitionable operator state to PartitionableCheckpointed operators

commit 090d27f6daa173b43f1c5d2ee19cc661359b925a
Author: Till Rohrmann <>
Date:   2016-09-08T22:05:13Z

    Make Kafka sources re-partitionable by using the partitionable non-keyed 
    Allow Kafka sources to use repartitionable state
    Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase

commit bc964d7ad6c1608d8bae0db67efdb70c4c278086
Author: Till Rohrmann <>
Date:   2016-09-09T13:34:38Z

    Make Kafka producer re-partitionable by removing the Checkpointed interface

commit 2366feeff842c3db39625da5a5912a98e83692db
Author: Till Rohrmann <>
Date:   2016-09-09T17:02:32Z

    Fix HeapKeyedStateBackend.restorePartitionedState

commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd
Author: Stefan Richter <>
Date:   2016-09-15T16:42:55Z

    Partitionable State temporary WIP

commit e84998d2702b9461f59df14c5aa1bb8d7438457a
Author: Stefan Richter <>
Date:   2016-09-16T12:22:47Z

    Introduce partitionable state in savepoint serializer

commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad
Author: Stefan Richter <>
Date:   2016-09-16T12:37:34Z

    Using known chain length to replace maps with list of known size

commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1
Author: Stefan Richter <>
Date:   2016-09-16T17:56:02Z

    Added test case for rescaling partitionable state

commit 26b48fedc648715e0ed3e5a4642bede91c86ff02
Author: Stefan Richter <>
Date:   2016-09-19T10:16:26Z

    Code cleanup, minor refinements and documentation


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

Reply via email to