[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582079#comment-15582079
 ] 

ASF GitHub Bot commented on FLINK-4844:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2648

    [FLINK-4844] Partitionable Raw Keyed/Operator State

    Partitionable operator and keyed state are currently only available by 
using backends. However, the serialization code for many operators is build 
around reading/writing their state to a stream for checkpointing. We want to 
provide partitionable states also through streams, so that migrating existing 
operators becomes more easy.
    
    This PR includes the following main changes:
    
    # 1) `KeyedStateCheckpointOutputStream` and 
`OperatorStateCheckpointedOutputStream`
    Those class allow writing partitionable keyed (and operator) state in a 
stream for checkpointing. They enhance the basic stream interface with methods 
to signal the start of new partitions.
    
    # 2) Changes to `StreamTask` and `AbstractStreamOperator`
    
    The lifecycle of StreamTask is slightly modified for the initialization of 
operator states. In `AbstractStreamOperator`, two new hooks have ben added that 
new operators can override:
    
    ```
        /**
         * Stream operators with state, which want to participate in a snapshot 
need to override this hook method.
         *
         * @param context context that provides information and means required 
for taking a snapshot
         */
        public void snapshotState(StateSnapshotContext context) throws 
Exception {
    
        }
    
        /**
         * Stream operators with state which can be restored need to override 
this hook method.
         *
         * @param context context that allows to register different states.
         */
        public void initializeState(StateInitializationContext context) throws 
Exception {
    
        }
    ```
    
    Access to snapshot/restore partitionable state is provided through the 
respective context
    
    # 3) Exposing partitionable states to UDFs
    
    The interface `CheckpointedFunction` must be implemented by stateful UDFs:
    
    ```
        /**
         * This method is called when a snapshot for a checkpoint is requested. 
This acts as a hook to the function to
         * ensure that all state is exposed by means previously offered through 
{@link FunctionInitializationContext} when
         * the Function was initialized, or offered now by {@link 
FunctionSnapshotContext} itself.
         *
         * @param context the context for drawing a snapshot of the operator
         * @throws Exception
         */
        void snapshotState(FunctionSnapshotContext context) throws Exception;
    
        /**
         * This method is called when an operator is initialized, so that the 
function can set up it's state through
         * the provided context. Initialization typically includes registering 
user states through the state stores
         * that the context offers.
         *
         * @param context the context for initializing the operator
         * @throws Exception
         */
        void initializeState(FunctionInitializationContext context) throws 
Exception;
    ```
    
    Contexts for initialization and snapshot provide a subset of the 
functionality of the internal contexts from `AbstractStreamOperator` and which 
is safe to present to user code.
    
    # 4) This PR also introduces serval classes that bundle state handles
    One example of this would be `TaskStateHandles`. The purpose of this is a) 
reducing the number of parameters passed through several methods and b) making 
adding/removing state handles simpler.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink 
stream-keyed-state-checkpointing

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2648
    
----
commit 2f72bdfe9e39f76a766c5fad68500caa39e9a624
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-10-04T08:59:38Z

    StreamCheckpointed operator state WIP

commit f3a1ee0963ac8521cb9581dcb1ab6547c50c304f
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-10-14T10:15:07Z

    Added to One/TwoInputStreamOperatorTestHarness

commit b3d7b683b6fd3e6a7fb2158e67fe94ac6e28e730
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-10-17T09:38:50Z

    Small optimizations for GC friendliness.

commit 614592d475486c1a2eff8e9fc24a423ff18a78bf
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-10-13T09:32:19Z

    Added AbstractUdfStreamOperatorLifecycleTest

commit 886ca3e2d4362db2f5125b0c1ea1f2078042c456
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-10-17T11:51:40Z

    Rename KeyedStateCheckpointOutputStream

----


> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
>                 Key: FLINK-4844
>                 URL: https://issues.apache.org/jira/browse/FLINK-4844
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to