[ 
https://issues.apache.org/jira/browse/FLINK-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828020#comment-15828020
 ] 

ASF GitHub Bot commented on FLINK-5502:
---------------------------------------

Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96628840
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the 
**keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will 
guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced 
in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from 
where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the 
`CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, 
while 
    +the second has **non-keyed** state. The code for the aforementioned two 
functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +                   new ValueStateDescriptor<>("counter", Integer.class, 
0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +               counter.update(count);
    +      
    +               if (count % numberElements == 0) {
    +                       out.collect(Tuple2.of(value.f0, count));
    +                       counter.update(0); // reset to 0
    +               }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, 
Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +       private final int threshold;
    +
    +       private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +       BufferingSink(int threshold) {
    +               this.threshold = threshold;
    +               this.bufferedElements = new ArrayList<>();
    +       }
    +
    +           @Override
    +       public void invoke(Tuple2<String, Integer> value) throws Exception {
    +               bufferedElements.add(value);
    +               if (bufferedElements.size() == threshold) {
    +                       for (Tuple2<String, Integer> element: 
bufferedElements) {
    +                               // send it to the sink
    +                       }
    +                       bufferedElements.clear();
    +               }
    +       }
    +
    +       @Override
    +       public ArrayList<Tuple2<String, Integer>> snapshotState(
    +               long checkpointId, long checkpointTimestamp) throws 
Exception {
    +               return bufferedElements;
    +       }
    +
    +       @Override
    +       public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
    +           bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key 
input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key 
(`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided 
threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements 
(potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, 
before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an 
external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a 
list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be 
modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of 
your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its 
predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the 
migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also 
works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be 
made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed 
state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T 
extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either 
the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is 
semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of 
*serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these 
objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 
the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the 
parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and 
non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two 
methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws 
Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old 
`Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to 
checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not 
re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. 
The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    +
    +        private transient ListState<Tuple2<String, Integer>> 
checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSinkListCheckpointed(int threashold) {
    +            this.threashold = threashold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception 
{
    +            this.bufferedElements.add(value);
    +            if (bufferedElements.size() == threashold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public List<Tuple2<String, Integer>> snapshotState(
    +                long checkpointId, long timestamp) throws Exception {
    +            return this.bufferedElements;
    +        }
    +
    +        @Override
    +        public void restoreState(List<Tuple2<String, Integer>> state) 
throws Exception {
    +            if (!state.isEmpty()) {
    +                this.bufferedElements.addAll(state);
    +            }
    +        }
    +        
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +As shown in the code, the updated function also implements the 
`CheckpointedRestoring` interface. This is for backwards 
    +compatibility reasons and more details will be explained in the end of 
this section.
    --- End diff --
    
    ... will be explained at the end ...


> Add documentation about migrating functions from 1.1 to 1.2
> -----------------------------------------------------------
>
>                 Key: FLINK-5502
>                 URL: https://issues.apache.org/jira/browse/FLINK-5502
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>    Affects Versions: 1.2.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>              Labels: documentation
>             Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to