GitHub user StefanRRichter opened a pull request:

    [FLINK-4844] Partitionable Raw Keyed/Operator State

    Partitionable operator and keyed state are currently only available by 
using backends. However, the serialization code for many operators is build 
around reading/writing their state to a stream for checkpointing. We want to 
provide partitionable states also through streams, so that migrating existing 
operators becomes more easy.
    This PR includes the following main changes:
    # 1) `KeyedStateCheckpointOutputStream` and 
    Those class allow writing partitionable keyed (and operator) state in a 
stream for checkpointing. They enhance the basic stream interface with methods 
to signal the start of new partitions.
    # 2) Changes to `StreamTask` and `AbstractStreamOperator`
    The lifecycle of StreamTask is slightly modified for the initialization of 
operator states. In `AbstractStreamOperator`, two new hooks have ben added that 
new operators can override:
         * Stream operators with state, which want to participate in a snapshot 
need to override this hook method.
         * @param context context that provides information and means required 
for taking a snapshot
        public void snapshotState(StateSnapshotContext context) throws 
Exception {
         * Stream operators with state which can be restored need to override 
this hook method.
         * @param context context that allows to register different states.
        public void initializeState(StateInitializationContext context) throws 
Exception {
    Access to snapshot/restore partitionable state is provided through the 
respective context
    # 3) Exposing partitionable states to UDFs
    The interface `CheckpointedFunction` must be implemented by stateful UDFs:
         * This method is called when a snapshot for a checkpoint is requested. 
This acts as a hook to the function to
         * ensure that all state is exposed by means previously offered through 
{@link FunctionInitializationContext} when
         * the Function was initialized, or offered now by {@link 
FunctionSnapshotContext} itself.
         * @param context the context for drawing a snapshot of the operator
         * @throws Exception
        void snapshotState(FunctionSnapshotContext context) throws Exception;
         * This method is called when an operator is initialized, so that the 
function can set up it's state through
         * the provided context. Initialization typically includes registering 
user states through the state stores
         * that the context offers.
         * @param context the context for initializing the operator
         * @throws Exception
        void initializeState(FunctionInitializationContext context) throws 
    Contexts for initialization and snapshot provide a subset of the 
functionality of the internal contexts from `AbstractStreamOperator` and which 
is safe to present to user code.
    # 4) This PR also introduces serval classes that bundle state handles
    One example of this would be `TaskStateHandles`. The purpose of this is a) 
reducing the number of parameters passed through several methods and b) making 
adding/removing state handles simpler.

You can merge this pull request into a Git repository by running:

    $ git pull 

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2648
commit 2f72bdfe9e39f76a766c5fad68500caa39e9a624
Author: Stefan Richter <>
Date:   2016-10-04T08:59:38Z

    StreamCheckpointed operator state WIP

commit f3a1ee0963ac8521cb9581dcb1ab6547c50c304f
Author: Stefan Richter <>
Date:   2016-10-14T10:15:07Z

    Added to One/TwoInputStreamOperatorTestHarness

commit b3d7b683b6fd3e6a7fb2158e67fe94ac6e28e730
Author: Stefan Richter <>
Date:   2016-10-17T09:38:50Z

    Small optimizations for GC friendliness.

commit 614592d475486c1a2eff8e9fc24a423ff18a78bf
Author: Stefan Richter <>
Date:   2016-10-13T09:32:19Z

    Added AbstractUdfStreamOperatorLifecycleTest

commit 886ca3e2d4362db2f5125b0c1ea1f2078042c456
Author: Stefan Richter <>
Date:   2016-10-17T11:51:40Z

    Rename KeyedStateCheckpointOutputStream


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

Reply via email to