[
https://issues.apache.org/jira/browse/FLINK-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828031#comment-15828031
]
ASF GitHub Bot commented on FLINK-5502:
---------------------------------------
Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/3130#discussion_r96628280
--- Diff: docs/dev/migration.md ---
@@ -25,9 +25,326 @@ under the License.
* This will be replaced by the TOC
{:toc}
-## Flink 1.1 to 1.2
+## Flink Function Migration from 1.1 to 1.2
-### State API
+### Introduction
-### Fast Processing Time Window Operators
+As mentioned [here](link here) Flink has two types of state, namely the
**keyed** and the **non-keyed** one.
+Both types are available to user-defined functions and this document will
guide you through the process
+of migrating your Flink-1.1 function code to Flink-1.2.
+The migration process will serve two goals:
+
+1. allow your functions to take advantage of the new features introduced
in Flink-1.2, such as rescaling,
+
+2. make sure that your new Flink-1.2 job will be able to continue from
where its Flink-1.1 predecessor stopped.
+
+As running examples for the remainder of this document we will use the
`CountMapper` and the `BufferingSink`
+functions. The first is an example of a function with **keyed** state,
while
+the second has **non-keyed** state. The code for the aforementioned two
functions in Flink-1.1 is presented below:
+
+ public class CountMapper extends RichFlatMapFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>> {
+
+ private transient ValueState<Integer> counter;
+
+ private final int numberElements;
+
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ counter = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("counter", Integer.class,
0));
+ }
+
+ @Override
+ public void flatMap(Tuple2<String, Integer> value,
Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
+
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
+ }
+ }
+ }
+
+
+ public class BufferingSink implements SinkFunction<Tuple2<String,
Integer>>,
+ Checkpointed<ArrayList<Tuple2<String, Integer>>> {
+
+ private final int threshold;
+
+ private ArrayList<Tuple2<String, Integer>> bufferedElements;
+
+ BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
+
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element:
bufferedElements) {
+ // send it to the sink
+ }
+ bufferedElements.clear();
+ }
+ }
+
+ @Override
+ public ArrayList<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long checkpointTimestamp) throws
Exception {
+ return bufferedElements;
+ }
+
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state)
throws Exception {
+ bufferedElements.addAll(state);
+ }
+ }
+
+
+The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key
input stream of the form
+`(word, 1)`. The function keeps a counter for each incoming key
(`ValueState<Integer> counter`) and if
+the number of occurrences of a certain word surpasses the user-provided
threshold, a tuple is emitted
+containing the word itself and the number of occurrences.
+
+The `BufferingSink` is a `SinkFunction` that receives elements
(potentially the output of the `CountMapper`)
+and buffers them until a certain user-specified threshold is reached,
before emitting them to the final sink.
+This is a common way to avoid many expensive calls to a database or an
external storage system. To do the
+buffering in a fault-tolerant manner, the buffered elements are kept in a
list (`bufferedElements`) which is
+periodically checkpointed.
+
+### Migration to Flink-1.2
+
+To leverage the new features of Flink-1.2, the code above should be
modified to use the new state abstractions.
+After doing these changes, you will be able to change the parallelism of
your job (scale up or down) and you
+are guaranteed that the new version of your job will start from where its
predecessor left off.
+
+**Keyed State:** Something to note before delving into the details of the
migration process is that if your function
+has **only keyed state**, then the exact same code from Flink-1.1 also
works for Flink-1.2 with full support
+for the new features and full backwards compatibility. Changes could be
made just for better code organization,
+but this is just a matter of style.
+
+With the above said, the rest of the paragraph focuses on the **non-keyed
state**.
--- End diff --
... the rest of this section focuses on ...
> Add documentation about migrating functions from 1.1 to 1.2
> -----------------------------------------------------------
>
> Key: FLINK-5502
> URL: https://issues.apache.org/jira/browse/FLINK-5502
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Affects Versions: 1.2.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Labels: documentation
> Fix For: 1.2.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)