Repository: flink Updated Branches: refs/heads/release-1.2 96659b0f7 -> da6ac7b5f
[FLINK-5502] [docs] Add migration guide in docs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da6ac7b5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da6ac7b5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da6ac7b5 Branch: refs/heads/release-1.2 Commit: da6ac7b5f7caf14bb2a8581db2e405e0585f7872 Parents: 96659b0 Author: kl0u <[email protected]> Authored: Fri Jan 13 15:52:04 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Feb 2 16:36:10 2017 +0100 ---------------------------------------------------------------------- docs/dev/migration.md | 393 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 390 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/da6ac7b5/docs/dev/migration.md ---------------------------------------------------------------------- diff --git a/docs/dev/migration.md b/docs/dev/migration.md index c74952c..a5910a8 100644 --- a/docs/dev/migration.md +++ b/docs/dev/migration.md @@ -25,9 +25,396 @@ under the License. * This will be replaced by the TOC {:toc} -## Flink 1.1 to 1.2 +## Migrating from Flink 1.1 to Flink 1.2 -### State API +As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state.html), Flink has two types of state: +**keyed** and **non-keyed** state (also called **operator** state). Both types are available to +both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 +function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the +deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)). -### Fast Processing Time Window Operators +The migration process will serve two goals: +1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling, + +2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its +Flink 1.1 predecessor. + +After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 +simply by taking a [savepoint]({{ site.baseurl }}/setup/savepoints.html) with your Flink 1.1 job and giving it to +your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its +Flink 1.1 predecessor left off. + +### Example User Functions + +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink` +functions. The first is an example of a function with **keyed** state, while +the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below: + + public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { + + private transient ValueState<Integer> counter; + + private final int numberElements; + + public CountMapper(int numberElements) { + this.numberElements = numberElements; + } + + @Override + public void open(Configuration parameters) throws Exception { + counter = getRuntimeContext().getState( + new ValueStateDescriptor<>("counter", Integer.class, 0)); + } + + @Override + public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { + int count = counter.value() + 1; + counter.update(count); + + if (count % numberElements == 0) { + out.collect(Tuple2.of(value.f0, count)); + counter.update(0); // reset to 0 + } + } + } + + + public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, + Checkpointed<ArrayList<Tuple2<String, Integer>>> { + + private final int threshold; + + private ArrayList<Tuple2<String, Integer>> bufferedElements; + + BufferingSink(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } + + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink + } + bufferedElements.clear(); + } + } + + @Override + public ArrayList<Tuple2<String, Integer>> snapshotState( + long checkpointId, long checkpointTimestamp) throws Exception { + return bufferedElements; + } + + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + bufferedElements.addAll(state); + } + } + + +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted +containing the word itself and the number of occurrences. + +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`) +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. +This is a common way to avoid many expensive calls to a database or an external storage system. To do the +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is +periodically checkpointed. + +### State API Migration + +To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions. +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you +are guaranteed that the new version of your job will start from where its predecessor left off. + +**Keyed State:** Something to note before delving into the details of the migration process is that if your function +has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support +for the new features and full backwards compatibility. Changes could be made just for better code organization, +but this is just a matter of style. + +With the above said, the rest of this section focuses on the **non-keyed state**. + +#### Rescaling and new state abstractions + +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface +to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction` +interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old +`Checkpointed` one. + +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` +contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, +while `(test2, 2)` will go to task 1. + +More details on the principles behind rescaling of both keyed state and non-keyed state can be found in +the [State documentation]({{ site.baseurl }}/dev/stream/state.html). + +##### ListCheckpointed + +The `ListCheckpointed` interface requires the implementation of two methods: + + List<T> snapshotState(long checkpointId, long timestamp) throws Exception; + + void restoreState(List<T> state) throws Exception; + +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference +is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always +return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink` +is included below: + + public class BufferingSinkListCheckpointed implements + SinkFunction<Tuple2<String, Integer>>, + ListCheckpointed<Tuple2<String, Integer>>, + CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { + + private final int threshold; + + private transient ListState<Tuple2<String, Integer>> checkpointedState; + + private List<Tuple2<String, Integer>> bufferedElements; + + public BufferingSinkListCheckpointed(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } + + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + this.bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink + } + bufferedElements.clear(); + } + } + + @Override + public List<Tuple2<String, Integer>> snapshotState( + long checkpointId, long timestamp) throws Exception { + return this.bufferedElements; + } + + @Override + public void restoreState(List<Tuple2<String, Integer>> state) throws Exception { + if (!state.isEmpty()) { + this.bufferedElements.addAll(state); + } + } + + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + // this is from the CheckpointedRestoring interface. + this.bufferedElements.addAll(state); + } + } + +As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards +compatibility reasons and more details will be explained at the end of this section. + +##### CheckpointedFunction + +The `CheckpointedFunction` interface requires again the implementation of two methods: + + void snapshotState(FunctionSnapshotContext context) throws Exception; + + void initializeState(FunctionInitializationContext context) throws Exception; + +As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is +the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only +in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different +types of state are initialized, but also where state recovery logic is included. An implementation of the +`CheckpointedFunction` interface for `BufferingSink` is presented below. + + public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, + CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { + + private final int threshold; + + private transient ListState<Tuple2<String, Integer>> checkpointedState; + + private List<Tuple2<String, Integer>> bufferedElements; + + public BufferingSink(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } + + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink + } + bufferedElements.clear(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkpointedState.clear(); + for (Tuple2<String, Integer> element : bufferedElements) { + checkpointedState.add(element); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + checkpointedState = context.getOperatorStateStore(). + getSerializableListState("buffered-elements"); + + if (context.isRestored()) { + for (Tuple2<String, Integer> element : checkpointedState.get()) { + bufferedElements.add(element); + } + } + } + + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + // this is from the CheckpointedRestoring interface. + this.bufferedElements.addAll(state); + } + } + +The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize +the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects +are going to be stored upon checkpointing: + +`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");` + +After initializing the container, we use the `isRestored()` method of the context to check if we are +recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. + +As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state +initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared +of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint. + +As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done +using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case +for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example, +the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods +would look like this: + + public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> + implements CheckpointedFunction { + + private transient ValueState<Integer> counter; + + private final int numberElements; + + public CountMapper(int numberElements) { + this.numberElements = numberElements; + } + + @Override + public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { + int count = counter.value() + 1; + counter.update(count); + + if (count % numberElements == 0) { + out.collect(Tuple2.of(value.f0, count)); + counter.update(0); // reset to 0 + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + //all managed, nothing to do. + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + counter = context.getKeyedStateStore().getState( + new ValueStateDescriptor<>("counter", Integer.class, 0)); + } + } + +Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state +upon checkpointing. + +#### Backwards compatibility with Flink 1.1 + +So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2. +The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already +running job from Flink 1.1 stopped?". + +The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing. +Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to +implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the +familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of +the `BufferingSink`, the `restoreState()` method is identical to its predecessor. + +### Aligned Processing Time Window Operators + +In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger, +the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be +either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of +these operators are referred to as *aligned* window operators as they assume their input elements arrive in +order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at +the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and +had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival. + +In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic +`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently +read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format +that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`. + +<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators +in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the +`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling +windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to +resume execution from a Flink 1.1 savepoint while using these operators. + +<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities +and **no backwards compatibility** with Flink 1.1. + +The code to use the aligned window operators in Flink 1.2 is presented below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +// for tumbling windows +DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))) + .apply(your-function) + +// for sliding windows +DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply(your-function) + +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} + +// for tumbling windows +val window1 = source + .keyBy(0) + .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))) + .apply(your-function) + +// for sliding windows +val window2 = source + .keyBy(0) + .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) + .apply(your-function) + +{% endhighlight %} +</div> +</div>
