Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96630636
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the 
**keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will 
guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced 
in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from 
where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the 
`CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, 
while 
    +the second has **non-keyed** state. The code for the aforementioned two 
functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +                   new ValueStateDescriptor<>("counter", Integer.class, 
0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +               counter.update(count);
    +      
    +               if (count % numberElements == 0) {
    +                       out.collect(Tuple2.of(value.f0, count));
    +                       counter.update(0); // reset to 0
    +               }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, 
Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +       private final int threshold;
    +
    +       private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +       BufferingSink(int threshold) {
    +               this.threshold = threshold;
    +               this.bufferedElements = new ArrayList<>();
    +       }
    +
    +           @Override
    +       public void invoke(Tuple2<String, Integer> value) throws Exception {
    +               bufferedElements.add(value);
    +               if (bufferedElements.size() == threshold) {
    +                       for (Tuple2<String, Integer> element: 
bufferedElements) {
    +                               // send it to the sink
    +                       }
    +                       bufferedElements.clear();
    +               }
    +       }
    +
    +       @Override
    +       public ArrayList<Tuple2<String, Integer>> snapshotState(
    +               long checkpointId, long checkpointTimestamp) throws 
Exception {
    +               return bufferedElements;
    +       }
    +
    +       @Override
    +       public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
    +           bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key 
input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key 
(`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided 
threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements 
(potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, 
before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an 
external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a 
list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be 
modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of 
your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its 
predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the 
migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also 
works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be 
made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed 
state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T 
extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either 
the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is 
semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of 
*serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these 
objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 
the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the 
parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and 
non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two 
methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws 
Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old 
`Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to 
checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not 
re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. 
The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    --- End diff --
    
    threshold is frequently misspelled in this example


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to