[
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582079#comment-15582079
]
ASF GitHub Bot commented on FLINK-4844:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2648
[FLINK-4844] Partitionable Raw Keyed/Operator State
Partitionable operator and keyed state are currently only available by
using backends. However, the serialization code for many operators is build
around reading/writing their state to a stream for checkpointing. We want to
provide partitionable states also through streams, so that migrating existing
operators becomes more easy.
This PR includes the following main changes:
# 1) `KeyedStateCheckpointOutputStream` and
`OperatorStateCheckpointedOutputStream`
Those class allow writing partitionable keyed (and operator) state in a
stream for checkpointing. They enhance the basic stream interface with methods
to signal the start of new partitions.
# 2) Changes to `StreamTask` and `AbstractStreamOperator`
The lifecycle of StreamTask is slightly modified for the initialization of
operator states. In `AbstractStreamOperator`, two new hooks have ben added that
new operators can override:
```
/**
* Stream operators with state, which want to participate in a snapshot
need to override this hook method.
*
* @param context context that provides information and means required
for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws
Exception {
}
/**
* Stream operators with state which can be restored need to override
this hook method.
*
* @param context context that allows to register different states.
*/
public void initializeState(StateInitializationContext context) throws
Exception {
}
```
Access to snapshot/restore partitionable state is provided through the
respective context
# 3) Exposing partitionable states to UDFs
The interface `CheckpointedFunction` must be implemented by stateful UDFs:
```
/**
* This method is called when a snapshot for a checkpoint is requested.
This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through
{@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link
FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when an operator is initialized, so that the
function can set up it's state through
* the provided context. Initialization typically includes registering
user states through the state stores
* that the context offers.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws
Exception;
```
Contexts for initialization and snapshot provide a subset of the
functionality of the internal contexts from `AbstractStreamOperator` and which
is safe to present to user code.
# 4) This PR also introduces serval classes that bundle state handles
One example of this would be `TaskStateHandles`. The purpose of this is a)
reducing the number of parameters passed through several methods and b) making
adding/removing state handles simpler.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink
stream-keyed-state-checkpointing
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2648.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2648
----
commit 2f72bdfe9e39f76a766c5fad68500caa39e9a624
Author: Stefan Richter <[email protected]>
Date: 2016-10-04T08:59:38Z
StreamCheckpointed operator state WIP
commit f3a1ee0963ac8521cb9581dcb1ab6547c50c304f
Author: Stefan Richter <[email protected]>
Date: 2016-10-14T10:15:07Z
Added to One/TwoInputStreamOperatorTestHarness
commit b3d7b683b6fd3e6a7fb2158e67fe94ac6e28e730
Author: Stefan Richter <[email protected]>
Date: 2016-10-17T09:38:50Z
Small optimizations for GC friendliness.
commit 614592d475486c1a2eff8e9fc24a423ff18a78bf
Author: Stefan Richter <[email protected]>
Date: 2016-10-13T09:32:19Z
Added AbstractUdfStreamOperatorLifecycleTest
commit 886ca3e2d4362db2f5125b0c1ea1f2078042c456
Author: Stefan Richter <[email protected]>
Date: 2016-10-17T11:51:40Z
Rename KeyedStateCheckpointOutputStream
----
> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
> Issue Type: New Feature
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using
> backends. However, the serialization code for many operators is build around
> reading/writing their state to a stream for checkpointing. We want to provide
> partitionable states also through streams, so that migrating existing
> operators becomes more easy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)