Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/3130#discussion_r96630446
--- Diff: docs/dev/migration.md ---
@@ -25,9 +25,326 @@ under the License.
* This will be replaced by the TOC
{:toc}
-## Flink 1.1 to 1.2
+## Flink Function Migration from 1.1 to 1.2
-### State API
+### Introduction
-### Fast Processing Time Window Operators
+As mentioned [here](link here) Flink has two types of state, namely the
**keyed** and the **non-keyed** one.
+Both types are available to user-defined functions and this document will
guide you through the process
+of migrating your Flink-1.1 function code to Flink-1.2.
+The migration process will serve two goals:
+
+1. allow your functions to take advantage of the new features introduced
in Flink-1.2, such as rescaling,
+
+2. make sure that your new Flink-1.2 job will be able to continue from
where its Flink-1.1 predecessor stopped.
+
+As running examples for the remainder of this document we will use the
`CountMapper` and the `BufferingSink`
+functions. The first is an example of a function with **keyed** state,
while
+the second has **non-keyed** state. The code for the aforementioned two
functions in Flink-1.1 is presented below:
+
+ public class CountMapper extends RichFlatMapFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>> {
+
+ private transient ValueState<Integer> counter;
+
+ private final int numberElements;
+
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ counter = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("counter", Integer.class,
0));
+ }
+
+ @Override
+ public void flatMap(Tuple2<String, Integer> value,
Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
+
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
+ }
+ }
+ }
+
+
+ public class BufferingSink implements SinkFunction<Tuple2<String,
Integer>>,
+ Checkpointed<ArrayList<Tuple2<String, Integer>>> {
+
+ private final int threshold;
+
+ private ArrayList<Tuple2<String, Integer>> bufferedElements;
+
+ BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
+
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element:
bufferedElements) {
+ // send it to the sink
+ }
+ bufferedElements.clear();
+ }
+ }
+
+ @Override
+ public ArrayList<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long checkpointTimestamp) throws
Exception {
+ return bufferedElements;
+ }
+
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state)
throws Exception {
+ bufferedElements.addAll(state);
+ }
+ }
+
+
+The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key
input stream of the form
+`(word, 1)`. The function keeps a counter for each incoming key
(`ValueState<Integer> counter`) and if
+the number of occurrences of a certain word surpasses the user-provided
threshold, a tuple is emitted
+containing the word itself and the number of occurrences.
+
+The `BufferingSink` is a `SinkFunction` that receives elements
(potentially the output of the `CountMapper`)
+and buffers them until a certain user-specified threshold is reached,
before emitting them to the final sink.
+This is a common way to avoid many expensive calls to a database or an
external storage system. To do the
+buffering in a fault-tolerant manner, the buffered elements are kept in a
list (`bufferedElements`) which is
+periodically checkpointed.
+
+### Migration to Flink-1.2
+
+To leverage the new features of Flink-1.2, the code above should be
modified to use the new state abstractions.
+After doing these changes, you will be able to change the parallelism of
your job (scale up or down) and you
+are guaranteed that the new version of your job will start from where its
predecessor left off.
+
+**Keyed State:** Something to note before delving into the details of the
migration process is that if your function
+has **only keyed state**, then the exact same code from Flink-1.1 also
works for Flink-1.2 with full support
+for the new features and full backwards compatibility. Changes could be
made just for better code organization,
+but this is just a matter of style.
+
+With the above said, the rest of the paragraph focuses on the **non-keyed
state**.
+
+#### Rescaling and new state abstractions
+
+The first modification is the transition from the old `Checkpointed<T
extends Serializable>` state interface
+to the new ones. In Flink-1.2, a stateful function can implement either
the more general `CheckpointedFunction`
+interface, or the `ListCheckpointed<T extends Serializable>`, which is
semantically closer to the old
+`Checkpointed` one.
+
+In both cases, the non-keyed state is expected to be a `List` of
*serializable* objects, independent from each other,
+thus eligible for redistribution upon rescaling. In other words, these
objects are the finest granularity at which
+non-keyed state can be repartitioned. As an example, if with parallelism 1
the checkpointed state of the `BufferingSink`
+contains elements `(test1, 2)` and `(test2, 2)` when increasing the
parallelism to 2, `(test1, 2)` may end up in task 0,
+while `(test2, 2)` will go to task 1.
+
+More details on the principles behind rescaling of both keyed state and
non-keyed state is done can be found
+[here](link here).
+
+##### ListCheckpointed
+
+The `ListCheckpointed` interface requires the implementation of two
methods:
+
+ List<T> snapshotState(long checkpointId, long timestamp) throws
Exception;
+
+ void restoreState(List<T> state) throws Exception;
+
+Their semantics are the same as their counterparts in the old
`Checkpointed` interface. The only difference
+is that now the `snapshotState()` should return a list of objects to
checkpoint, as stated earlier, and the
+`restoreState` has to handle this list upon recovery. If the state is not
re-partitionable, you can always
+return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`.
The updated code for the `BufferingSink`
+is included below:
+
+ public class BufferingSinkListCheckpointed implements
+ SinkFunction<Tuple2<String, Integer>>,
+ ListCheckpointed<Tuple2<String, Integer>>,
+ CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+ private final int threashold;
+
+ private transient ListState<Tuple2<String, Integer>>
checkpointedState;
+
+ private List<Tuple2<String, Integer>> bufferedElements;
+
+ public BufferingSinkListCheckpointed(int threashold) {
+ this.threashold = threashold;
+ this.bufferedElements = new ArrayList<>();
+ }
+
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception
{
+ this.bufferedElements.add(value);
+ if (bufferedElements.size() == threashold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
+ }
+ bufferedElements.clear();
+ }
+ }
+
+ @Override
+ public List<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long timestamp) throws Exception {
+ return this.bufferedElements;
+ }
+
+ @Override
+ public void restoreState(List<Tuple2<String, Integer>> state)
throws Exception {
+ if (!state.isEmpty()) {
+ this.bufferedElements.addAll(state);
+ }
+ }
+
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state)
throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
+ }
+ }
+
+As shown in the code, the updated function also implements the
`CheckpointedRestoring` interface. This is for backwards
+compatibility reasons and more details will be explained in the end of
this section.
+
+##### CheckpointedFunction
+
+The `CheckpointedFunction` interface requires again the implementation of
two methods:
+
+ void snapshotState(FunctionSnapshotContext context) throws Exception;
+
+ void initializeState(FunctionInitializationContext context) throws
Exception;
+
+The difference here is that although, as in Flink-1.1, the
`snapshotState()` is called when a checkpoint
+is performed, the `initializeState()` which is the counterpart of the
`restoreState()`, is called every time
+the user-defined function is initialized and not only in the case that we
are recovering from a failure.
+Given this, the `initializeState()` is the place where the different types
of the state can be initialized,
+but also where the state recovery logic is included. The code for the
`BufferingSink` implementing the
+`CheckpointedFunction` interface is presented below.
+
+ public class BufferingSink implements SinkFunction<Tuple2<String,
Integer>>,
+ CheckpointedFunction,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+ private final int threshold;
+
+ private transient ListState<Tuple2<String, Integer>>
checkpointedState;
+
+ private List<Tuple2<String, Integer>> bufferedElements;
+
+ public BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
+
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception
{
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
+ }
+ bufferedElements.clear();
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ checkpointedState.clear();
+ for (Tuple2<String, Integer> element : bufferedElements) {
+ checkpointedState.add(element);
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ checkpointedState = context.getOperatorStateStore().
+ getSerializableListState("buffered-elements");
+
+ if (context.isRestored()) {
+ for (Tuple2<String, Integer> element :
checkpointedState.get()) {
+ bufferedElements.add(element);
+ }
+ }
+ }
+
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state)
throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
+ }
+ }
+
+The `initializeState` takes as argument a `FunctionInitializationContext`.
This is used to initialize
+the non-keyed state "container". This is a container of type `ListState`
where the non-keyed state objects
+are going to be stored upon checkpointing:
+
+`this.checkpointedState =
context.getOperatorStateStore().getSerializableListState("buffered-elements");`
+
+After initializing the container, we check if we are recovering after
failure using the `isRestored()` method
+of the context. If this is `true`, *i.e.* we are recovering, the restore
logic is applied.
+
+As shown in the code of the modified `BufferingSink`, this `ListState`
recovered during state
+initialization is kept in a class variable for future use in the
`snapshotState()`. There,
+the `ListState` is initially cleared from all the objects included there
from the previous checkpoint
+and then it is filled with the new ones we want to checkpoint.
+
+As a side note, the keyed state can also be initialized in the
`initializeState()` method. This can be done
+using the `FunctionInitializationContext` given as argument, instead of
the `RuntimeContext`, which is the case
+for Flink-1.1. If the `CheckpointedFunction` interface was to be used in
the case of the `CountMapper`,
+the old `open()` method could be removed and the new `snaphshotState()`
and `initializeState()` methods
+would look like the following:
+
+ public class CountMapper extends RichFlatMapFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>>
+ implements CheckpointedFunction {
+
+ private transient ValueState<Integer> counter;
+
+ private final int numberElements;
+
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
+
+ @Override
+ public void flatMap(Tuple2<String, Integer> value,
Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
+
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ //all managed, nothing to do.
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ counter = context.getKeyedStateStore().getState(
+ new ValueStateDescriptor<>("counter", Integer.class, 0));
+ }
+ }
+
+Notice that the `snaphotState()` method is empty as Flink itself takes
care of snapshotting managed keyed state
+upon checkpointing.
+
+#### Backwards compatibility with Flink-1.1
+
+So far we have seen how to modify our functions to take advantage of the
new features introduced be Flink-1.2.
+The question that remains is "Can I make sure that my modified (Flink-1.2)
job will start from where my already
+running job from Flink-1.1 stopped?".
+
+The answer is yes and the way to do it is pretty straight-forward. For the
keyed state, you have to do nothing.
--- End diff --
straightforward is one word, no hyphen
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---