[
https://issues.apache.org/jira/browse/FLINK-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828020#comment-15828020
]
ASF GitHub Bot commented on FLINK-5502:
---------------------------------------
Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/3130#discussion_r96628840
--- Diff: docs/dev/migration.md ---
@@ -25,9 +25,326 @@ under the License.
* This will be replaced by the TOC
{:toc}
-## Flink 1.1 to 1.2
+## Flink Function Migration from 1.1 to 1.2
-### State API
+### Introduction
-### Fast Processing Time Window Operators
+As mentioned [here](link here) Flink has two types of state, namely the
**keyed** and the **non-keyed** one.
+Both types are available to user-defined functions and this document will
guide you through the process
+of migrating your Flink-1.1 function code to Flink-1.2.
+The migration process will serve two goals:
+
+1. allow your functions to take advantage of the new features introduced
in Flink-1.2, such as rescaling,
+
+2. make sure that your new Flink-1.2 job will be able to continue from
where its Flink-1.1 predecessor stopped.
+
+As running examples for the remainder of this document we will use the
`CountMapper` and the `BufferingSink`
+functions. The first is an example of a function with **keyed** state,
while
+the second has **non-keyed** state. The code for the aforementioned two
functions in Flink-1.1 is presented below:
+
+ public class CountMapper extends RichFlatMapFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>> {
+
+ private transient ValueState<Integer> counter;
+
+ private final int numberElements;
+
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ counter = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("counter", Integer.class,
0));
+ }
+
+ @Override
+ public void flatMap(Tuple2<String, Integer> value,
Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
+
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
+ }
+ }
+ }
+
+
+ public class BufferingSink implements SinkFunction<Tuple2<String,
Integer>>,
+ Checkpointed<ArrayList<Tuple2<String, Integer>>> {
+
+ private final int threshold;
+
+ private ArrayList<Tuple2<String, Integer>> bufferedElements;
+
+ BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
+
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element:
bufferedElements) {
+ // send it to the sink
+ }
+ bufferedElements.clear();
+ }
+ }
+
+ @Override
+ public ArrayList<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long checkpointTimestamp) throws
Exception {
+ return bufferedElements;
+ }
+
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state)
throws Exception {
+ bufferedElements.addAll(state);
+ }
+ }
+
+
+The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key
input stream of the form
+`(word, 1)`. The function keeps a counter for each incoming key
(`ValueState<Integer> counter`) and if
+the number of occurrences of a certain word surpasses the user-provided
threshold, a tuple is emitted
+containing the word itself and the number of occurrences.
+
+The `BufferingSink` is a `SinkFunction` that receives elements
(potentially the output of the `CountMapper`)
+and buffers them until a certain user-specified threshold is reached,
before emitting them to the final sink.
+This is a common way to avoid many expensive calls to a database or an
external storage system. To do the
+buffering in a fault-tolerant manner, the buffered elements are kept in a
list (`bufferedElements`) which is
+periodically checkpointed.
+
+### Migration to Flink-1.2
+
+To leverage the new features of Flink-1.2, the code above should be
modified to use the new state abstractions.
+After doing these changes, you will be able to change the parallelism of
your job (scale up or down) and you
+are guaranteed that the new version of your job will start from where its
predecessor left off.
+
+**Keyed State:** Something to note before delving into the details of the
migration process is that if your function
+has **only keyed state**, then the exact same code from Flink-1.1 also
works for Flink-1.2 with full support
+for the new features and full backwards compatibility. Changes could be
made just for better code organization,
+but this is just a matter of style.
+
+With the above said, the rest of the paragraph focuses on the **non-keyed
state**.
+
+#### Rescaling and new state abstractions
+
+The first modification is the transition from the old `Checkpointed<T
extends Serializable>` state interface
+to the new ones. In Flink-1.2, a stateful function can implement either
the more general `CheckpointedFunction`
+interface, or the `ListCheckpointed<T extends Serializable>`, which is
semantically closer to the old
+`Checkpointed` one.
+
+In both cases, the non-keyed state is expected to be a `List` of
*serializable* objects, independent from each other,
+thus eligible for redistribution upon rescaling. In other words, these
objects are the finest granularity at which
+non-keyed state can be repartitioned. As an example, if with parallelism 1
the checkpointed state of the `BufferingSink`
+contains elements `(test1, 2)` and `(test2, 2)` when increasing the
parallelism to 2, `(test1, 2)` may end up in task 0,
+while `(test2, 2)` will go to task 1.
+
+More details on the principles behind rescaling of both keyed state and
non-keyed state is done can be found
+[here](link here).
+
+##### ListCheckpointed
+
+The `ListCheckpointed` interface requires the implementation of two
methods:
+
+ List<T> snapshotState(long checkpointId, long timestamp) throws
Exception;
+
+ void restoreState(List<T> state) throws Exception;
+
+Their semantics are the same as their counterparts in the old
`Checkpointed` interface. The only difference
+is that now the `snapshotState()` should return a list of objects to
checkpoint, as stated earlier, and the
+`restoreState` has to handle this list upon recovery. If the state is not
re-partitionable, you can always
+return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`.
The updated code for the `BufferingSink`
+is included below:
+
+ public class BufferingSinkListCheckpointed implements
+ SinkFunction<Tuple2<String, Integer>>,
+ ListCheckpointed<Tuple2<String, Integer>>,
+ CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+ private final int threashold;
+
+ private transient ListState<Tuple2<String, Integer>>
checkpointedState;
+
+ private List<Tuple2<String, Integer>> bufferedElements;
+
+ public BufferingSinkListCheckpointed(int threashold) {
+ this.threashold = threashold;
+ this.bufferedElements = new ArrayList<>();
+ }
+
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception
{
+ this.bufferedElements.add(value);
+ if (bufferedElements.size() == threashold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
+ }
+ bufferedElements.clear();
+ }
+ }
+
+ @Override
+ public List<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long timestamp) throws Exception {
+ return this.bufferedElements;
+ }
+
+ @Override
+ public void restoreState(List<Tuple2<String, Integer>> state)
throws Exception {
+ if (!state.isEmpty()) {
+ this.bufferedElements.addAll(state);
+ }
+ }
+
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state)
throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
+ }
+ }
+
+As shown in the code, the updated function also implements the
`CheckpointedRestoring` interface. This is for backwards
+compatibility reasons and more details will be explained in the end of
this section.
--- End diff --
... will be explained at the end ...
> Add documentation about migrating functions from 1.1 to 1.2
> -----------------------------------------------------------
>
> Key: FLINK-5502
> URL: https://issues.apache.org/jira/browse/FLINK-5502
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Affects Versions: 1.2.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Labels: documentation
> Fix For: 1.2.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)