[
https://issues.apache.org/jira/browse/FLINK-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828019#comment-15828019
]
ASF GitHub Bot commented on FLINK-5502:
---------------------------------------
Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/3130#discussion_r96614280
--- Diff: docs/dev/migration.md ---
@@ -25,9 +25,326 @@ under the License.
* This will be replaced by the TOC
{:toc}
-## Flink 1.1 to 1.2
+## Flink Function Migration from 1.1 to 1.2
-### State API
+### Introduction
-### Fast Processing Time Window Operators
+As mentioned [here](link here) Flink has two types of state, namely the
**keyed** and the **non-keyed** one.
+Both types are available to user-defined functions and this document will
guide you through the process
+of migrating your Flink-1.1 function code to Flink-1.2.
+The migration process will serve two goals:
+
+1. allow your functions to take advantage of the new features introduced
in Flink-1.2, such as rescaling,
+
+2. make sure that your new Flink-1.2 job will be able to continue from
where its Flink-1.1 predecessor stopped.
+
+As running examples for the remainder of this document we will use the
`CountMapper` and the `BufferingSink`
+functions. The first is an example of a function with **keyed** state,
while
+the second has **non-keyed** state. The code for the aforementioned two
functions in Flink-1.1 is presented below:
+
+ public class CountMapper extends RichFlatMapFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>> {
+
+ private transient ValueState<Integer> counter;
+
+ private final int numberElements;
+
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ counter = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("counter", Integer.class,
0));
+ }
+
+ @Override
+ public void flatMap(Tuple2<String, Integer> value,
Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
+
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
+ }
+ }
+ }
+
+
+ public class BufferingSink implements SinkFunction<Tuple2<String,
Integer>>,
+ Checkpointed<ArrayList<Tuple2<String, Integer>>> {
+
+ private final int threshold;
+
+ private ArrayList<Tuple2<String, Integer>> bufferedElements;
+
+ BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
+
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element:
bufferedElements) {
+ // send it to the sink
+ }
+ bufferedElements.clear();
+ }
+ }
+
+ @Override
+ public ArrayList<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long checkpointTimestamp) throws
Exception {
+ return bufferedElements;
+ }
+
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state)
throws Exception {
+ bufferedElements.addAll(state);
+ }
+ }
+
+
+The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key
input stream of the form
--- End diff --
I suggest either
> a grouped-by-key input stream
or
> a keyed input stream
> Add documentation about migrating functions from 1.1 to 1.2
> -----------------------------------------------------------
>
> Key: FLINK-5502
> URL: https://issues.apache.org/jira/browse/FLINK-5502
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Affects Versions: 1.2.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Labels: documentation
> Fix For: 1.2.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)