Repository: flink
Updated Branches:
  refs/heads/release-1.2 96659b0f7 -> da6ac7b5f


[FLINK-5502] [docs] Add migration guide in docs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da6ac7b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da6ac7b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da6ac7b5

Branch: refs/heads/release-1.2
Commit: da6ac7b5f7caf14bb2a8581db2e405e0585f7872
Parents: 96659b0
Author: kl0u <[email protected]>
Authored: Fri Jan 13 15:52:04 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Feb 2 16:36:10 2017 +0100

----------------------------------------------------------------------
 docs/dev/migration.md | 393 ++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 390 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da6ac7b5/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index c74952c..a5910a8 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -25,9 +25,396 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Flink 1.1 to 1.2
+## Migrating from Flink 1.1 to Flink 1.2
 
-### State API
+As mentioned in the [State documentation]({{ site.baseurl 
}}/dev/stream/state.html), Flink has two types of state:
+**keyed** and **non-keyed** state (also called **operator** state). Both types 
are available to
+both operators and user-defined functions. This document will guide you 
through the process of migrating your Flink 1.1
+function code to Flink 1.2 and will present some important internal changes 
introduced in Flink 1.2 that concern the
+deprecation of the aligned window operators from Flink 1.1 (see [Aligned 
Processing Time Window Operators](#aligned-processing-time-window-operators)).
 
-### Fast Processing Time Window Operators
+The migration process will serve two goals:
 
+1. allow your functions to take advantage of the new features introduced in 
Flink 1.2, such as rescaling,
+
+2. make sure that your new Flink 1.2 job will be able to resume execution from 
a savepoint generated by its
+Flink 1.1 predecessor.
+
+After following the steps in this guide, you will be able to migrate your 
running job from Flink 1.1 to Flink 1.2
+simply by taking a [savepoint]({{ site.baseurl }}/setup/savepoints.html) with 
your Flink 1.1 job and giving it to
+your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to 
resume execution from where its
+Flink 1.1 predecessor left off.
+
+### Example User Functions
+
+As running examples for the remainder of this document we will use the 
`CountMapper` and the `BufferingSink`
+functions. The first is an example of a function with **keyed** state, while
+the second has **non-keyed** state. The code for the aforementioned two 
functions in Flink 1.1 is presented below:
+
+    public class CountMapper extends RichFlatMapFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>> {
+
+        private transient ValueState<Integer> counter;
+
+        private final int numberElements;
+
+        public CountMapper(int numberElements) {
+            this.numberElements = numberElements;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            counter = getRuntimeContext().getState(
+               new ValueStateDescriptor<>("counter", Integer.class, 0));
+        }
+
+        @Override
+        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+            int count = counter.value() + 1;
+           counter.update(count);
+
+           if (count % numberElements == 0) {
+                   out.collect(Tuple2.of(value.f0, count));
+                   counter.update(0); // reset to 0
+           }
+        }
+    }
+
+
+    public class BufferingSink implements SinkFunction<Tuple2<String, 
Integer>>,
+            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
+
+           private final int threshold;
+
+           private ArrayList<Tuple2<String, Integer>> bufferedElements;
+
+           BufferingSink(int threshold) {
+                   this.threshold = threshold;
+                   this.bufferedElements = new ArrayList<>();
+           }
+
+       @Override
+           public void invoke(Tuple2<String, Integer> value) throws Exception {
+                   bufferedElements.add(value);
+                   if (bufferedElements.size() == threshold) {
+                           for (Tuple2<String, Integer> element: 
bufferedElements) {
+                                   // send it to the sink
+                           }
+                           bufferedElements.clear();
+                   }
+           }
+
+           @Override
+           public ArrayList<Tuple2<String, Integer>> snapshotState(
+                   long checkpointId, long checkpointTimestamp) throws 
Exception {
+                   return bufferedElements;
+           }
+
+           @Override
+           public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
+               bufferedElements.addAll(state);
+        }
+    }
+
+
+The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key 
input stream of the form
+`(word, 1)`. The function keeps a counter for each incoming key 
(`ValueState<Integer> counter`) and if
+the number of occurrences of a certain word surpasses the user-provided 
threshold, a tuple is emitted
+containing the word itself and the number of occurrences.
+
+The `BufferingSink` is a `SinkFunction` that receives elements (potentially 
the output of the `CountMapper`)
+and buffers them until a certain user-specified threshold is reached, before 
emitting them to the final sink.
+This is a common way to avoid many expensive calls to a database or an 
external storage system. To do the
+buffering in a fault-tolerant manner, the buffered elements are kept in a list 
(`bufferedElements`) which is
+periodically checkpointed.
+
+### State API Migration
+
+To leverage the new features of Flink 1.2, the code above should be modified 
to use the new state abstractions.
+After doing these changes, you will be able to change the parallelism of your 
job (scale up or down) and you
+are guaranteed that the new version of your job will start from where its 
predecessor left off.
+
+**Keyed State:** Something to note before delving into the details of the 
migration process is that if your function
+has **only keyed state**, then the exact same code from Flink 1.1 also works 
for Flink 1.2 with full support
+for the new features and full backwards compatibility. Changes could be made 
just for better code organization,
+but this is just a matter of style.
+
+With the above said, the rest of this section focuses on the **non-keyed 
state**.
+
+#### Rescaling and new state abstractions
+
+The first modification is the transition from the old `Checkpointed<T extends 
Serializable>` state interface
+to the new ones. In Flink 1.2, a stateful function can implement either the 
more general `CheckpointedFunction`
+interface, or the `ListCheckpointed<T extends Serializable>` interface, which 
is semantically closer to the old
+`Checkpointed` one.
+
+In both cases, the non-keyed state is expected to be a `List` of 
*serializable* objects, independent from each other,
+thus eligible for redistribution upon rescaling. In other words, these objects 
are the finest granularity at which
+non-keyed state can be repartitioned. As an example, if with parallelism 1 the 
checkpointed state of the `BufferingSink`
+contains elements `(test1, 2)` and `(test2, 2)`, when increasing the 
parallelism to 2, `(test1, 2)` may end up in task 0,
+while `(test2, 2)` will go to task 1.
+
+More details on the principles behind rescaling of both keyed state and 
non-keyed state can be found in
+the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
+
+##### ListCheckpointed
+
+The `ListCheckpointed` interface requires the implementation of two methods:
+
+    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+
+    void restoreState(List<T> state) throws Exception;
+
+Their semantics are the same as their counterparts in the old `Checkpointed` 
interface. The only difference
+is that now `snapshotState()` should return a list of objects to checkpoint, 
as stated earlier, and
+`restoreState` has to handle this list upon recovery. If the state is not 
re-partitionable, you can always
+return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The 
updated code for `BufferingSink`
+is included below:
+
+    public class BufferingSinkListCheckpointed implements
+            SinkFunction<Tuple2<String, Integer>>,
+            ListCheckpointed<Tuple2<String, Integer>>,
+            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+        private final int threshold;
+
+        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+
+        private List<Tuple2<String, Integer>> bufferedElements;
+
+        public BufferingSinkListCheckpointed(int threshold) {
+            this.threshold = threshold;
+            this.bufferedElements = new ArrayList<>();
+        }
+
+        @Override
+        public void invoke(Tuple2<String, Integer> value) throws Exception {
+            this.bufferedElements.add(value);
+            if (bufferedElements.size() == threshold) {
+                for (Tuple2<String, Integer> element: bufferedElements) {
+                    // send it to the sink
+                }
+                bufferedElements.clear();
+            }
+        }
+
+        @Override
+        public List<Tuple2<String, Integer>> snapshotState(
+                long checkpointId, long timestamp) throws Exception {
+            return this.bufferedElements;
+        }
+
+        @Override
+        public void restoreState(List<Tuple2<String, Integer>> state) throws 
Exception {
+            if (!state.isEmpty()) {
+                this.bufferedElements.addAll(state);
+            }
+        }
+
+        @Override
+        public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
+            // this is from the CheckpointedRestoring interface.
+            this.bufferedElements.addAll(state);
+        }
+    }
+
+As shown in the code, the updated function also implements the 
`CheckpointedRestoring` interface. This is for backwards
+compatibility reasons and more details will be explained at the end of this 
section.
+
+##### CheckpointedFunction
+
+The `CheckpointedFunction` interface requires again the implementation of two 
methods:
+
+    void snapshotState(FunctionSnapshotContext context) throws Exception;
+
+    void initializeState(FunctionInitializationContext context) throws 
Exception;
+
+As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is 
performed, but now `initializeState()` (which is
+the counterpart of the `restoreState()`) is called every time the user-defined 
function is initialized, rather than only
+in the case that we are recovering from a failure. Given this, 
`initializeState()` is not only the place where different
+types of state are initialized, but also where state recovery logic is 
included. An implementation of the
+`CheckpointedFunction` interface for `BufferingSink` is presented below.
+
+    public class BufferingSink implements SinkFunction<Tuple2<String, 
Integer>>,
+            CheckpointedFunction, 
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+        private final int threshold;
+
+        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+
+        private List<Tuple2<String, Integer>> bufferedElements;
+
+        public BufferingSink(int threshold) {
+            this.threshold = threshold;
+            this.bufferedElements = new ArrayList<>();
+        }
+
+        @Override
+        public void invoke(Tuple2<String, Integer> value) throws Exception {
+            bufferedElements.add(value);
+            if (bufferedElements.size() == threshold) {
+                for (Tuple2<String, Integer> element: bufferedElements) {
+                    // send it to the sink
+                }
+                bufferedElements.clear();
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+            checkpointedState.clear();
+            for (Tuple2<String, Integer> element : bufferedElements) {
+                checkpointedState.add(element);
+            }
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) 
throws Exception {
+            checkpointedState = context.getOperatorStateStore().
+                getSerializableListState("buffered-elements");
+
+            if (context.isRestored()) {
+                for (Tuple2<String, Integer> element : 
checkpointedState.get()) {
+                    bufferedElements.add(element);
+                }
+            }
+        }
+
+        @Override
+        public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
+            // this is from the CheckpointedRestoring interface.
+            this.bufferedElements.addAll(state);
+        }
+    }
+
+The `initializeState` takes as argument a `FunctionInitializationContext`. 
This is used to initialize
+the non-keyed state "container". This is a container of type `ListState` where 
the non-keyed state objects
+are going to be stored upon checkpointing:
+
+`this.checkpointedState = 
context.getOperatorStateStore().getSerializableListState("buffered-elements");`
+
+After initializing the container, we use the `isRestored()` method of the 
context to check if we are
+recovering after a failure. If this is `true`, *i.e.* we are recovering, the 
restore logic is applied.
+
+As shown in the code of the modified `BufferingSink`, this `ListState` 
recovered during state
+initialization is kept in a class variable for future use in 
`snapshotState()`. There the `ListState` is cleared
+of all objects included by the previous checkpoint, and is then filled with 
the new ones we want to checkpoint.
+
+As a side note, the keyed state can also be initialized in the 
`initializeState()` method. This can be done
+using the `FunctionInitializationContext` given as argument, instead of the 
`RuntimeContext`, which is the case
+for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the 
`CountMapper` example,
+the old `open()` method could be removed and the new `snapshotState()` and 
`initializeState()` methods
+would look like this:
+
+    public class CountMapper extends RichFlatMapFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>>
+            implements CheckpointedFunction {
+
+        private transient ValueState<Integer> counter;
+
+        private final int numberElements;
+
+        public CountMapper(int numberElements) {
+            this.numberElements = numberElements;
+        }
+
+        @Override
+        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+            int count = counter.value() + 1;
+            counter.update(count);
+
+            if (count % numberElements == 0) {
+                out.collect(Tuple2.of(value.f0, count));
+               counter.update(0); // reset to 0
+               }
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+            //all managed, nothing to do.
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) 
throws Exception {
+            counter = context.getKeyedStateStore().getState(
+                new ValueStateDescriptor<>("counter", Integer.class, 0));
+        }
+    }
+
+Notice that the `snapshotState()` method is empty as Flink itself takes care 
of snapshotting managed keyed state
+upon checkpointing.
+
+#### Backwards compatibility with Flink 1.1
+
+So far we have seen how to modify our functions to take advantage of the new 
features introduced by Flink 1.2.
+The question that remains is "Can I make sure that my modified (Flink 1.2) job 
will start from where my already
+running job from Flink 1.1 stopped?".
+
+The answer is yes, and the way to do it is pretty straightforward. For the 
keyed state, you have to do nothing.
+Flink will take care of restoring the state from Flink 1.1. For the non-keyed 
state, your new function has to
+implement the `CheckpointedRestoring` interface, as shown in the code above. 
This has a single method, the
+familiar `restoreState()` from the old `Checkpointed` interface from Flink 
1.1. As shown in the modified code of
+the `BufferingSink`, the `restoreState()` method is identical to its 
predecessor.
+
+### Aligned Processing Time Window Operators
+
+In Flink 1.1, and only when operating on *processing time* with no specified 
evictor or trigger,
+the command `timeWindow()` on a keyed stream would instantiate a special type 
of `WindowOperator`. This could be
+either an `AggregatingProcessingTimeWindowOperator` or an 
`AccumulatingProcessingTimeWindowOperator`. Both of
+these operators are referred to as *aligned* window operators as they assume 
their input elements arrive in
+order. This is valid when operating in processing time, as elements get as 
timestamp the wall-clock time at
+the moment they arrive at the window operator. These operators were restricted 
to using the memory state backend, and
+had optimized data structures for storing the per-window elements which 
leveraged the in-order input element arrival.
+
+In Flink 1.2, the aligned window operators are deprecated, and all windowing 
operations go through the generic
+`WindowOperator`. This migration requires no change in the code of your Flink 
1.1 job, as Flink will transparently
+read the state stored by the aligned window operators in your Flink 1.1 
savepoint, translate it into a format
+that is compatible with the generic `WindowOperator`, and resume execution 
using the generic `WindowOperator`.
+
+<span class="label label-info">Note</span> Although deprecated, you can still 
use the aligned window operators
+in Flink 1.2 through special `WindowAssigners` introduced for exactly this 
purpose. These assigners are the
+`SlidingAlignedProcessingTimeWindows` and the 
`TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
+windows respectively. A Flink 1.2 job that uses aligned windowing has to be a 
new job, as there is no way to
+resume execution from a Flink 1.1 savepoint while using these operators.
+
+<span class="label label-danger">Attention</span> The aligned window operators 
provide **no rescaling** capabilities
+and **no backwards compatibility** with Flink 1.1.
+
+The code to use the aligned window operators in Flink 1.2 is presented below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// for tumbling windows
+DataStream<Tuple2<String, Integer>> window1 = source
+       .keyBy(0)
+       .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, 
TimeUnit.MILLISECONDS)))
+       .apply(your-function)
+
+// for sliding windows
+DataStream<Tuple2<String, Integer>> window1 = source
+       .keyBy(0)
+       .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+       .apply(your-function)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// for tumbling windows
+val window1 = source
+    .keyBy(0)
+    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, 
TimeUnit.MILLISECONDS)))
+    .apply(your-function)
+
+// for sliding windows
+val window2 = source
+    .keyBy(0)
+    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
+    .apply(your-function)
+
+{% endhighlight %}
+</div>
+</div>

Reply via email to