Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3508#discussion_r109610444
--- Diff: docs/dev/stream/state.md ---
@@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream
## Using Managed Operator State
-A stateful function can implement either the more general
`CheckpointedFunction`
+To use managed operator state, a stateful function can implement either
the more general `CheckpointedFunction`
interface, or the `ListCheckpointed<T extends Serializable>` interface.
-In both cases, the non-keyed state is expected to be a `List` of
*serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these
objects are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1
the checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the
parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-##### ListCheckpointed
+#### CheckpointedFunction
-The `ListCheckpointed` interface requires the implementation of two
methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-On `snapshotState()` the operator should return a list of objects to
checkpoint and
-`restoreState` has to handle such a list upon recovery. If the state is
not re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface also requires the implementation of
two methods:
+The `CheckpointedFunction` interface provides access to non-keyed state
with different
+redistribution schemes. It requires the implementation of two methods:
{% highlight java %}
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws
Exception;
{% endhighlight %}
-Whenever a checkpoint has to be performed `snapshotState()` is called. The
counterpart, `initializeState()`, is called every time the user-defined
function is initialized, be that when the function is first initialized
-or be that when actually recovering from an earlier checkpoint. Given
this, `initializeState()` is not
+Whenever a checkpoint has to be performed, `snapshotState()` is called.
The counterpart, `initializeState()`,
+is called every time the user-defined function is initialized, be that
when the function is first initialized
+or be that when the function is actually recovering from an earlier
checkpoint. Given this, `initializeState()` is not
only the place where different types of state are initialized, but also
where state recovery logic is included.
-This is an example of a function that uses `CheckpointedFunction`, a
stateful `SinkFunction` that
-uses state to buffer elements before sending them to the outside world:
+Currently, list-style managed operator state is supported. The state
+is expected to be a `List` of *serializable* objects, independent from
each other,
+thus eligible for redistribution upon rescaling. In other words, these
objects are the finest granularity at which
+non-keyed state can be redistributed. Depending on the state accessing
method,
+the following redistribution schemes are defined:
+
+ - **Even-split redistribution:** Each operator returns a List of state
elements. The whole state is logically a concatenation of
--- End diff --
Could use "Round-Robin redistribution". Maybe...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---