[FLINK-5456] Resurrect and update parts of the state intro documentation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d27b9fee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d27b9fee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d27b9fee Branch: refs/heads/master Commit: d27b9fee5f21997505ad3434f46e5ff1f4e225ed Parents: ec3eb59 Author: David Anderson <da...@alpinegizmo.com> Authored: Wed Jan 18 15:56:18 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jan 30 17:58:27 2017 +0100 ---------------------------------------------------------------------- docs/dev/stream/state.md | 332 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 314 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d27b9fee/docs/dev/stream/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md index 0b38a62..124ce68 100644 --- a/docs/dev/stream/state.md +++ b/docs/dev/stream/state.md @@ -26,11 +26,11 @@ under the License. {:toc} Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. For example: +any type of more elaborate operation. For example: - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. - When aggregating events per minute, the state holds the pending aggregates. - - When training a machine learning model over a stream of data points, the state holds the current verstion of the model parameters. + - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html) it. In many cases, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk @@ -39,40 +39,336 @@ if necessary) to allow applications to hold very large state. This document explains how to use Flink's state abstractions when developing an application. -## Keyed State and Operator state +## Keyed State and Operator State -There are two basic state backends: `Keyed State` and `Operator State`. +There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. -#### Keyed State +### Keyed State *Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. -Examples of keyed state are the `ValueState` or `ListState` that one can create in a function on a `KeyedStream`, as -well as the state of a keyed window operator. -Keyed State is organized in so called *Key Groups*. Key Groups are the unit by which keyed state can be redistributed and -there are as many key groups as the defined maximum parallelism. -During execution each parallel instance of an operator gets one or more key groups. +You can think of Keyed State as Operator State that has been partitioned, +or sharded, with exactly one state-partition per key. +Each keyed-state is logically bound to a unique +composite of <parallel-operator-instance, key>, and since each key +"belongs" to exactly one parallel instance of a keyed operator, we can +think of this simply as <operator, key>. -#### Operator State +Keyed State is further organized into so-called *Key Groups*. Key Groups are the +atomic unit by which Flink can redistribute Keyed State; +there are exactly as many Key Groups as the defined maximum parallelism. +During execution each parallel instance of a keyed operator works with the keys +for one or more Key Groups. -*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` interface in Flink 1.0 and Flink 1.1. -The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the Operator State. +### Operator State -Operator State needs special re-distribution schemes when parallelism is changed. There can be different variations of such -schemes; the following are currently defined: +With *Operator State* (or *non-keyed state*), each operator state is +bound to one parallel operator instance. +The Kafka source connector is a good motivating example for the use of Operator State +in Flink. Each parallel instance of this Kafka consumer maintains a map +of topic partitions and offsets as its Operator State. + +The Operator State interfaces support redistributing state among +parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution; the following are currently defined: - **List-style redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. Each operator gets a sublist, which can be empty, or contain one or more elements. - ## Raw and Managed State *Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. *Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. -Examples are "ValueState", "ListState", etc. Flink's runtime encodes the states and writes them into the checkpoints. +Examples are "ValueState", "ListState", etc. Flink's runtime encodes +the states and writes them into the checkpoints. -*Raw State* is state that users and operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into +*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes. +All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. +Using managed state (rather than raw state) is recommended, since with +managed state Flink is able to automatically redistribute state when the parallelism is +changed, and also do better memory management. + +## Using Managed Keyed State + +The managed keyed state interface provides access to different types of state that are all scoped to +the key of the current input element. This means that this type of state can only be used +on a `KeyedStream`, which can be created via `stream.keyBy(â¦)`. + +Now, we will first look at the different types of state available and then we will see +how they can be used in a program. The available state primitives are: + +* `ValueState<T>`: This keeps a value that can be updated and +retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value +for each key that the operation sees). The value can be set using `update(T)` and retrieved using +`T value()`. + +* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable` +over all currently stored elements. Elements are added using `add(T)`, the Iterable can +be retrieved using `Iterable<T> get()`. + +* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values +added to the state. The interface is the same as for `ListState` but elements added using +`add(T)` are reduced to an aggregate using a specified `ReduceFunction`. + +All types of state also have a method `clear()` that clears the state for the currently +active key, i.e. the key of the input element. + +It is important to keep in mind that these state objects are only used for interfacing +with state. The state is not necessarily stored inside but might reside on disk or somewhere else. +The second thing to keep in mind is that the value you get from the state +depends on the key of the input element. So the value you get in one invocation of your +user function can differ from the value in another invocation if the keys involved are different. + +To get a state handle, you have to create a `StateDescriptor`. This holds the name of the state +(as we will see later, you can create several states, and they have to have unique names so +that you can reference them), the type of the values that the state holds, and possibly +a user-specified function, such as a `ReduceFunction`. Depending on what type of state you +want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor` or +a `ReducingStateDescriptor`. + +State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*. +Please see [here]({{ site.baseurl }}/dev/api_concepts#rich-functions) for +information about that, but we will also see an example shortly. The `RuntimeContext` that +is available in a `RichFunction` has these methods for accessing state: + +* `ValueState<T> getState(ValueStateDescriptor<T>)` +* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)` +* `ListState<T> getListState(ListStateDescriptor<T>)` + +This is an example `FlatMapFunction` that shows how all of the parts fit together: + +{% highlight java %} +public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + /** + * The ValueState handle. The first field is the count, the second field a running sum. + */ + private transient ValueState<Tuple2<Long, Long>> sum; + + @Override + public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { + + // access the state value + Tuple2<Long, Long> currentSum = sum.value(); + + // update the count + currentSum.f0 += 1; + + // add the second field of the input value + currentSum.f1 += input.f1; + + // update the state + sum.update(currentSum); + + // if the count reaches 2, emit the average and clear the state + if (currentSum.f0 >= 2) { + out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); + sum.clear(); + } + } + + @Override + public void open(Configuration config) { + ValueStateDescriptor<Tuple2<Long, Long>> descriptor = + new ValueStateDescriptor<>( + "average", // the state name + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information + Tuple2.of(0L, 0L)); // default value of the state, if nothing was set + sum = getRuntimeContext().getState(descriptor); + } +} + +// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) +env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) + .keyBy(0) + .flatMap(new CountWindowAverage()) + .print(); + +// the printed output will be (1,4) and (1,5) +{% endhighlight %} + +This example implements a poor man's counting window. We key the tuples by the first field +(in the example all have the same key `1`). The function stores the count and a running sum in +a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so that +we start over from `0`. Note that this would keep a different state value for each different input +key if we had tuples with different values in the first field. + +### State in the Scala DataStream API + +In addition to the interface described above, the Scala API has shortcuts for stateful +`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function +gets the current value of the `ValueState` in an `Option` and must return an updated value that +will be used to update the state. + +{% highlight scala %} +val stream: DataStream[(String, Int)] = ... + +val counts: DataStream[(String, Int)] = stream + .keyBy(_._1) + .mapWithState((in: (String, Int), count: Option[Int]) => + count match { + case Some(c) => ( (in._1, c), Some(c + in._2) ) + case None => ( (in._1, 0), Some(in._2) ) + }) +{% endhighlight %} + +## Using Managed Operator State + +A stateful function can implement either the more general `CheckpointedFunction` +interface, or the `ListCheckpointed<T extends Serializable>` interface. + +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` +contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, +while `(test2, 2)` will go to task 1. + +##### ListCheckpointed + +The `ListCheckpointed` interface requires the implementation of two methods: + + List<T> snapshotState(long checkpointId, long timestamp) throws Exception; + + void restoreState(List<T> state) throws Exception; + +On `snapshotState()` the operator should return a list of objects to checkpoint and +`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always +return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. + +##### CheckpointedFunction + +The `CheckpointedFunction` interface also requires the implementation of two methods: + + void snapshotState(FunctionSnapshotContext context) throws Exception; + + void initializeState(FunctionInitializationContext context) throws Exception; + +Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +only the place where different types of state are initialized, but also where state recovery logic is included. + +This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that +uses state to buffer elements before sending them to the outside world: + + public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, + CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { + + private final int threshold; + + private transient ListState<Tuple2<String, Integer>> checkpointedState; + + private List<Tuple2<String, Integer>> bufferedElements; + + public BufferingSink(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } + + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink + } + bufferedElements.clear(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkpointedState.clear(); + for (Tuple2<String, Integer> element : bufferedElements) { + checkpointedState.add(element); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + checkpointedState = context.getOperatorStateStore(). + getSerializableListState("buffered-elements"); + + if (context.isRestored()) { + for (Tuple2<String, Integer> element : checkpointedState.get()) { + bufferedElements.add(element); + } + } + } + + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + // this is from the CheckpointedRestoring interface. + this.bufferedElements.addAll(state); + } + } + + +The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize +the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects +are going to be stored upon checkpointing. + +`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");` + +After initializing the container, we use the `isRestored()` method of the context to check if we are +recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. + +As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state +initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared +of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint. + +As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done +using the provided `FunctionInitializationContext`. + +### Stateful Source Functions + +Stateful sources require a bit more care as opposed to other operators. +In order to make the updates to the state and output collection atomic (required for exactly-once semantics +on failure/recovery), the user is required to get a lock from the source's context. + +{% highlight java %} +public static class CounterSource + extends RichParallelSourceFunction<Long> + implements ListCheckpointed<Long> { + + /** current offset for exactly once semantics */ + private Long offset; + + /** flag for job cancellation */ + private volatile boolean isRunning = true; + + @Override + public void run(SourceContext<Long> ctx) { + final Object lock = ctx.getCheckpointLock(); + + while (isRunning) { + // output and state update are atomic + synchronized (lock) { + ctx.collect(offset); + offset += 1; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) { + return Collections.singletonList(offset); + } + + @Override + public void restoreState(List<Long> state) { + for (Long s : state) + offset = s; + } +} +{% endhighlight %} + +Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +