Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96614280
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the 
**keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will 
guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced 
in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from 
where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the 
`CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, 
while 
    +the second has **non-keyed** state. The code for the aforementioned two 
functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +                   new ValueStateDescriptor<>("counter", Integer.class, 
0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +               counter.update(count);
    +      
    +               if (count % numberElements == 0) {
    +                       out.collect(Tuple2.of(value.f0, count));
    +                       counter.update(0); // reset to 0
    +               }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, 
Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +       private final int threshold;
    +
    +       private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +       BufferingSink(int threshold) {
    +               this.threshold = threshold;
    +               this.bufferedElements = new ArrayList<>();
    +       }
    +
    +           @Override
    +       public void invoke(Tuple2<String, Integer> value) throws Exception {
    +               bufferedElements.add(value);
    +               if (bufferedElements.size() == threshold) {
    +                       for (Tuple2<String, Integer> element: 
bufferedElements) {
    +                               // send it to the sink
    +                       }
    +                       bufferedElements.clear();
    +               }
    +       }
    +
    +       @Override
    +       public ArrayList<Tuple2<String, Integer>> snapshotState(
    +               long checkpointId, long checkpointTimestamp) throws 
Exception {
    +               return bufferedElements;
    +       }
    +
    +       @Override
    +       public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
    +           bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key 
input stream of the form 
    --- End diff --
    
    I suggest either
    
    > a grouped-by-key input stream
    
    or
    
    > a keyed input stream



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to