[ 
https://issues.apache.org/jira/browse/FLINK-4379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503567#comment-15503567
 ] 

ASF GitHub Bot commented on FLINK-4379:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2512

    Partitionable op state

    This pull request introduces rescalable non-partitioned operator state as 
described in issue [FLINK-4379] and makes following major changes:
    
    # 1) Introducing interfaces `PartitionableStateBackend` and 
`SnapshotProvider`
    
    For rescaling purposes, non-partitioned state is now stored in a 
`PartitionableStateBackend`, which provides a method to register operator 
states under unique names and store state partitions in a list state:
    
    ```
    <S> ListState<S> getPartitionableState(
        String name,
        Collection<PartitionableOperatorStateHandle> restoreSnapshots,
        TypeSerializer<S> partitionStateSerializer) throws Exception;
    ```
    
    Notice that a `TypeSerializer` is provided on a per-state basis so that we 
can realize backward compatibility if the serialization format changes.
    
    Furthermore, we introduce `SnapshotProvider` which offers a method to 
create snapshots:
    
    ```
    RunnableFuture<S> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory) throws Exception;
    ```
    
    `DefaultPartitionableStateBackend` is an implementation of both, 
`PartitionableStateBackend` and `SnapshotProvider`, where the first interface 
gives a view that is exposed to user code and the later interface is only used 
by the system to trigger snapshots.
    
    Similar to other state backends (e.g. `KeyedStateBackend`), a 
`DefaultPartitionableStateBackend` can be created and restored through the 
class `AbstractStateBackend`:
    
    ```
    public DefaultPartitionableStateBackend createPartitionableStateBackend(
        Environment env,
        String operatorIdentifier)
    
    public DefaultPartitionableStateBackend restorePartitionableStateBackend(
        Environment env,
        String operatorIdentifier,
        Collection<PartitionableOperatorStateHandle> restoreSnapshots)
    ```
    
    # 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed`
    
    User code can use rescalable non-partitioned state by implementing the 
`PartitionableCheckpointed` interface. This interface is meant to replace the 
`Checkpointed` interface, which would become deprecated.
    
    ```
    void storeOperatorState(long checkpointId, PartitionableStateBackend 
backend) throws Exception;
    
    void restoreOperatorState(PartitionableStateBackend backend) throws 
Exception;
    ```
    
    Through the store/restore methods, user code can interact with 
`PartitionableStateBackend` and register/restore states as previously explained.
    
    Methods from this interface are invoked in 
`StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()` 
respectively.
    
    # 3) Interface `StateRepartitioner`
    
    This interface allows implementations of repartitioning strategies the 
redistribute non-partitioned state when for changing parallelism. Currently, 
there is only one default strategy implemented in `RoundRobinStatePartitioner`, 
but in general it is possible to implement different strategies, e.g. a 
`StatePartitioner` that distributes the union of all previous states parallel 
subtasks to each parallel subtask. This interface is used in 
`CheckpointCoordinator::restoreLatestCheckpointedState()`.
    
    TODO: After review, a description of the new features must still be added 
to the Flink documentation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink partitionable-op-state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2512
    
----
commit d133b948b8a201178e4ad3afa49e715bebf8eb5f
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-31T21:59:27Z

    State backend infrastructure to support partitionable op state.
    
    Introducing CheckpointStateHandles as container for all checkpoint related 
state handles
    
    tmp repartitioning function wip

commit 7213938be4bd706b57128c49cb24b24450ea1c87
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-09-07T16:52:48Z

    Add task chain length to TaskState

commit b639c8c62f64f840b95cd6191d7adf514eecddc6
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-09-08T21:11:11Z

    Forward partitionable operator state to PartitionableCheckpointed operators

commit 090d27f6daa173b43f1c5d2ee19cc661359b925a
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-09-08T22:05:13Z

    Make Kafka sources re-partitionable by using the partitionable non-keyed 
state
    
    Allow Kafka sources to use repartitionable state
    
    Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase

commit bc964d7ad6c1608d8bae0db67efdb70c4c278086
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-09-09T13:34:38Z

    Make Kafka producer re-partitionable by removing the Checkpointed interface

commit 2366feeff842c3db39625da5a5912a98e83692db
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-09-09T17:02:32Z

    Fix HeapKeyedStateBackend.restorePartitionedState

commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-09-15T16:42:55Z

    Partitionable State temporary WIP

commit e84998d2702b9461f59df14c5aa1bb8d7438457a
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-09-16T12:22:47Z

    Introduce partitionable state in savepoint serializer

commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-09-16T12:37:34Z

    Using known chain length to replace maps with list of known size

commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-09-16T17:56:02Z

    Added test case for rescaling partitionable state

commit 26b48fedc648715e0ed3e5a4642bede91c86ff02
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-09-19T10:16:26Z

    Code cleanup, minor refinements and documentation

----


> Add Rescalable Non-Partitioned State
> ------------------------------------
>
>                 Key: FLINK-4379
>                 URL: https://issues.apache.org/jira/browse/FLINK-4379
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Ufuk Celebi
>            Assignee: Stefan Richter
>
> This issue is associated with [FLIP-8| 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-8%3A+Rescalable+Non-Partitioned+State].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to