GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2648
[FLINK-4844] Partitionable Raw Keyed/Operator State
Partitionable operator and keyed state are currently only available by
using backends. However, the serialization code for many operators is build
around reading/writing their state to a stream for checkpointing. We want to
provide partitionable states also through streams, so that migrating existing
operators becomes more easy.
This PR includes the following main changes:
# 1) `KeyedStateCheckpointOutputStream` and
`OperatorStateCheckpointedOutputStream`
Those class allow writing partitionable keyed (and operator) state in a
stream for checkpointing. They enhance the basic stream interface with methods
to signal the start of new partitions.
# 2) Changes to `StreamTask` and `AbstractStreamOperator`
The lifecycle of StreamTask is slightly modified for the initialization of
operator states. In `AbstractStreamOperator`, two new hooks have ben added that
new operators can override:
```
/**
* Stream operators with state, which want to participate in a snapshot
need to override this hook method.
*
* @param context context that provides information and means required
for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws
Exception {
}
/**
* Stream operators with state which can be restored need to override
this hook method.
*
* @param context context that allows to register different states.
*/
public void initializeState(StateInitializationContext context) throws
Exception {
}
```
Access to snapshot/restore partitionable state is provided through the
respective context
# 3) Exposing partitionable states to UDFs
The interface `CheckpointedFunction` must be implemented by stateful UDFs:
```
/**
* This method is called when a snapshot for a checkpoint is requested.
This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through
{@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link
FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when an operator is initialized, so that the
function can set up it's state through
* the provided context. Initialization typically includes registering
user states through the state stores
* that the context offers.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws
Exception;
```
Contexts for initialization and snapshot provide a subset of the
functionality of the internal contexts from `AbstractStreamOperator` and which
is safe to present to user code.
# 4) This PR also introduces serval classes that bundle state handles
One example of this would be `TaskStateHandles`. The purpose of this is a)
reducing the number of parameters passed through several methods and b) making
adding/removing state handles simpler.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink
stream-keyed-state-checkpointing
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2648.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2648
----
commit 2f72bdfe9e39f76a766c5fad68500caa39e9a624
Author: Stefan Richter <[email protected]>
Date: 2016-10-04T08:59:38Z
StreamCheckpointed operator state WIP
commit f3a1ee0963ac8521cb9581dcb1ab6547c50c304f
Author: Stefan Richter <[email protected]>
Date: 2016-10-14T10:15:07Z
Added to One/TwoInputStreamOperatorTestHarness
commit b3d7b683b6fd3e6a7fb2158e67fe94ac6e28e730
Author: Stefan Richter <[email protected]>
Date: 2016-10-17T09:38:50Z
Small optimizations for GC friendliness.
commit 614592d475486c1a2eff8e9fc24a423ff18a78bf
Author: Stefan Richter <[email protected]>
Date: 2016-10-13T09:32:19Z
Added AbstractUdfStreamOperatorLifecycleTest
commit 886ca3e2d4362db2f5125b0c1ea1f2078042c456
Author: Stefan Richter <[email protected]>
Date: 2016-10-17T11:51:40Z
Rename KeyedStateCheckpointOutputStream
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---