This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5339a8137fb368789808b9e47a9f97896e89b255 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Fri May 29 10:53:14 2020 +0200 [FLINK-18032] Remove outdated sections in migration guide --- docs/dev/migration.md | 456 +---------------------------------------------- docs/dev/migration.zh.md | 456 +---------------------------------------------- 2 files changed, 8 insertions(+), 904 deletions(-) diff --git a/docs/dev/migration.md b/docs/dev/migration.md index 288e2ef..09556c1 100644 --- a/docs/dev/migration.md +++ b/docs/dev/migration.md @@ -25,6 +25,10 @@ under the License. * This will be replaced by the TOC {:toc} +See the [older migration +guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html) +for information about migrating from older versions than Flink 1.3. + ## Migrating from Flink 1.3+ to Flink 1.7 ### API changes for serializer snapshots @@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now deprecated, and will b in favor of the new `TypeSerializerSnapshot`. For details and guides on how to migrate, please see [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17). -## Migrating from Flink 1.2 to Flink 1.3 - -There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their -specific documentations. The following is a consolidated list of API changes and links to details for migration when -upgrading to Flink 1.3. - -### `TypeSerializer` interface changes - -This would be relevant mostly for users implementing custom `TypeSerializer`s for their state. - -Since Flink 1.3, two additional methods have been added that are related to serializer compatibility -across savepoint restores. Please see -[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility) -for further details on how to implement these methods. - -### `ProcessFunction` is always a `RichFunction` - -In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced. -Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to -the lifecycle methods and runtime context. - -### Flink CEP library API changes - -The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API. -Please visit the [CEP Migration docs]({{ site.baseurl }}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details. - -### Logger dependencies removed from Flink core artifacts - -In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are -now clean of specific logger dependencies. - -Example and quickstart archetypes already have loggers specified and should not be affected. -For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add: - -{% highlight xml %} -<dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.7</version> -</dependency> - -<dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>1.2.17</version> -</dependency> -{% endhighlight %} - -## Migrating from Flink 1.1 to Flink 1.2 - -As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state: -**keyed** and **non-keyed** state (also called **operator** state). Both types are available to -both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 -function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the -deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)). - -The migration process will serve two goals: - -1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling, - -2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its -Flink 1.1 predecessor. - -After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 -simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to -your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its -Flink 1.1 predecessor left off. - -### Example User Functions - -As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink` -functions. The first is an example of a function with **keyed** state, while -the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below: - -{% highlight java %} -public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { - - private transient ValueState<Integer> counter; - - private final int numberElements; - - public CountMapper(int numberElements) { - this.numberElements = numberElements; - } - - @Override - public void open(Configuration parameters) throws Exception { - counter = getRuntimeContext().getState( - new ValueStateDescriptor<>("counter", Integer.class, 0)); - } - - @Override - public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { - int count = counter.value() + 1; - counter.update(count); - - if (count % numberElements == 0) { - out.collect(Tuple2.of(value.f0, count)); - counter.update(0); // reset to 0 - } - } -} - -public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, - Checkpointed<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; - - private ArrayList<Tuple2<String, Integer>> bufferedElements; - - BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } - - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } - } - - @Override - public ArrayList<Tuple2<String, Integer>> snapshotState( - long checkpointId, long checkpointTimestamp) throws Exception { - return bufferedElements; - } - - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - bufferedElements.addAll(state); - } -} -{% endhighlight %} - - -The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key input stream of the form -`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if -the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted -containing the word itself and the number of occurrences. - -The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`) -and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. -This is a common way to avoid many expensive calls to a database or an external storage system. To do the -buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is -periodically checkpointed. - -### State API Migration - -To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions. -After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you -are guaranteed that the new version of your job will start from where its predecessor left off. - -**Keyed State:** Something to note before delving into the details of the migration process is that if your function -has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support -for the new features and full backwards compatibility. Changes could be made just for better code organization, -but this is just a matter of style. - -With the above said, the rest of this section focuses on the **non-keyed state**. - -#### Rescaling and new state abstractions - -The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface -to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction` -interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old -`Checkpointed` one. - -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -More details on the principles behind rescaling of both keyed state and non-keyed state can be found in -the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html). - -##### ListCheckpointed - -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List<T> state) throws Exception; -{% endhighlight %} - -Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference -is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and -`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink` -is included below: - -{% highlight java %} -public class BufferingSinkListCheckpointed implements - SinkFunction<Tuple2<String, Integer>>, - ListCheckpointed<Tuple2<String, Integer>>, - CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; - - private transient ListState<Tuple2<String, Integer>> checkpointedState; - - private List<Tuple2<String, Integer>> bufferedElements; - - public BufferingSinkListCheckpointed(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } - - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - this.bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } - } - - @Override - public List<Tuple2<String, Integer>> snapshotState( - long checkpointId, long timestamp) throws Exception { - return this.bufferedElements; - } - - @Override - public void restoreState(List<Tuple2<String, Integer>> state) throws Exception { - if (!state.isEmpty()) { - this.bufferedElements.addAll(state); - } - } - - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. - this.bufferedElements.addAll(state); - } -} -{% endhighlight %} - -As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards -compatibility reasons and more details will be explained at the end of this section. - -##### CheckpointedFunction - -The `CheckpointedFunction` interface requires again the implementation of two methods: - -{% highlight java %} -void snapshotState(FunctionSnapshotContext context) throws Exception; - -void initializeState(FunctionInitializationContext context) throws Exception; -{% endhighlight %} - -As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is -the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only -in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different -types of state are initialized, but also where state recovery logic is included. An implementation of the -`CheckpointedFunction` interface for `BufferingSink` is presented below. - -{% highlight java %} -public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, - CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; - - private transient ListState<Tuple2<String, Integer>> checkpointedState; - - private List<Tuple2<String, Integer>> bufferedElements; - - public BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } - - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - checkpointedState.clear(); - for (Tuple2<String, Integer> element : bufferedElements) { - checkpointedState.add(element); - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - checkpointedState = context.getOperatorStateStore(). - getSerializableListState("buffered-elements"); - - if (context.isRestored()) { - for (Tuple2<String, Integer> element : checkpointedState.get()) { - bufferedElements.add(element); - } - } - } - - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. - this.bufferedElements.addAll(state); - } -} -{% endhighlight %} - -The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize -the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects -are going to be stored upon checkpointing: - -`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");` - -After initializing the container, we use the `isRestored()` method of the context to check if we are -recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. - -As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state -initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared -of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint. - -As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done -using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case -for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example, -the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods -would look like this: - -{% highlight java %} -public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> - implements CheckpointedFunction { - - private transient ValueState<Integer> counter; - - private final int numberElements; - - public CountMapper(int numberElements) { - this.numberElements = numberElements; - } - - @Override - public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { - int count = counter.value() + 1; - counter.update(count); - - if (count % numberElements == 0) { - out.collect(Tuple2.of(value.f0, count)); - counter.update(0); // reset to 0 - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - // all managed, nothing to do. - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - counter = context.getKeyedStateStore().getState( - new ValueStateDescriptor<>("counter", Integer.class, 0)); - } -} -{% endhighlight %} - -Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state -upon checkpointing. - -#### Backwards compatibility with Flink 1.1 - -So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2. -The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already -running job from Flink 1.1 stopped?". - -The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing. -Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to -implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the -familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of -the `BufferingSink`, the `restoreState()` method is identical to its predecessor. - -### Aligned Processing Time Window Operators - -In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger, -the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be -either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of -these operators are referred to as *aligned* window operators as they assume their input elements arrive in -order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at -the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and -had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival. - -In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic -`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently -read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format -that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`. - -<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators -in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the -`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling -windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to -resume execution from a Flink 1.1 savepoint while using these operators. - -<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities -and **no backwards compatibility** with Flink 1.1. - -The code to use the aligned window operators in Flink 1.2 is presented below: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} - -// for tumbling windows -DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(0) - .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))) - .apply(your-function) - -// for sliding windows -DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(0) - .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) - .apply(your-function) - -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} - -// for tumbling windows -val window1 = source - .keyBy(0) - .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))) - .apply(your-function) - -// for sliding windows -val window2 = source - .keyBy(0) - .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) - .apply(your-function) - -{% endhighlight %} -</div> -</div> - {% top %} diff --git a/docs/dev/migration.zh.md b/docs/dev/migration.zh.md index fb1c559..76b4e8c 100644 --- a/docs/dev/migration.zh.md +++ b/docs/dev/migration.zh.md @@ -25,6 +25,10 @@ under the License. * This will be replaced by the TOC {:toc} +See the [older migration +guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html) +for information about migrating from older versions than Flink 1.3. + ## Migrating from Flink 1.3+ to Flink 1.7 ### API changes for serializer snapshots @@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now deprecated, and will b in favor of the new `TypeSerializerSnapshot`. For details and guides on how to migrate, please see [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17). -## Migrating from Flink 1.2 to Flink 1.3 - -There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their -specific documentations. The following is a consolidated list of API changes and links to details for migration when -upgrading to Flink 1.3. - -### `TypeSerializer` interface changes - -This would be relevant mostly for users implementing custom `TypeSerializer`s for their state. - -Since Flink 1.3, two additional methods have been added that are related to serializer compatibility -across savepoint restores. Please see -[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility) -for further details on how to implement these methods. - -### `ProcessFunction` is always a `RichFunction` - -In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced. -Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to -the lifecycle methods and runtime context. - -### Flink CEP library API changes - -The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API. -Please visit the [CEP Migration docs]({{ site.baseurl }}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details. - -### Logger dependencies removed from Flink core artifacts - -In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are -now clean of specific logger dependencies. - -Example and quickstart archetypes already have loggers specified and should not be affected. -For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add: - -{% highlight xml %} -<dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.7</version> -</dependency> - -<dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>1.2.17</version> -</dependency> -{% endhighlight %} - -## Migrating from Flink 1.1 to Flink 1.2 - -As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state: -**keyed** and **non-keyed** state (also called **operator** state). Both types are available to -both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 -function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the -deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)). - -The migration process will serve two goals: - -1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling, - -2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its -Flink 1.1 predecessor. - -After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 -simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to -your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its -Flink 1.1 predecessor left off. - -### Example User Functions - -As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink` -functions. The first is an example of a function with **keyed** state, while -the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below: - -{% highlight java %} -public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { - - private transient ValueState<Integer> counter; - - private final int numberElements; - - public CountMapper(int numberElements) { - this.numberElements = numberElements; - } - - @Override - public void open(Configuration parameters) throws Exception { - counter = getRuntimeContext().getState( - new ValueStateDescriptor<>("counter", Integer.class, 0)); - } - - @Override - public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { - int count = counter.value() + 1; - counter.update(count); - - if (count % numberElements == 0) { - out.collect(Tuple2.of(value.f0, count)); - counter.update(0); // reset to 0 - } - } -} - -public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, - Checkpointed<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; - - private ArrayList<Tuple2<String, Integer>> bufferedElements; - - BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } - - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } - } - - @Override - public ArrayList<Tuple2<String, Integer>> snapshotState( - long checkpointId, long checkpointTimestamp) throws Exception { - return bufferedElements; - } - - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - bufferedElements.addAll(state); - } -} -{% endhighlight %} - - -The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key input stream of the form -`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if -the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted -containing the word itself and the number of occurrences. - -The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`) -and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. -This is a common way to avoid many expensive calls to a database or an external storage system. To do the -buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is -periodically checkpointed. - -### State API Migration - -To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions. -After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you -are guaranteed that the new version of your job will start from where its predecessor left off. - -**Keyed State:** Something to note before delving into the details of the migration process is that if your function -has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support -for the new features and full backwards compatibility. Changes could be made just for better code organization, -but this is just a matter of style. - -With the above said, the rest of this section focuses on the **non-keyed state**. - -#### Rescaling and new state abstractions - -The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface -to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction` -interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old -`Checkpointed` one. - -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -More details on the principles behind rescaling of both keyed state and non-keyed state can be found in -the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html). - -##### ListCheckpointed - -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List<T> state) throws Exception; -{% endhighlight %} - -Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference -is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and -`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink` -is included below: - -{% highlight java %} -public class BufferingSinkListCheckpointed implements - SinkFunction<Tuple2<String, Integer>>, - ListCheckpointed<Tuple2<String, Integer>>, - CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; - - private transient ListState<Tuple2<String, Integer>> checkpointedState; - - private List<Tuple2<String, Integer>> bufferedElements; - - public BufferingSinkListCheckpointed(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } - - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - this.bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } - } - - @Override - public List<Tuple2<String, Integer>> snapshotState( - long checkpointId, long timestamp) throws Exception { - return this.bufferedElements; - } - - @Override - public void restoreState(List<Tuple2<String, Integer>> state) throws Exception { - if (!state.isEmpty()) { - this.bufferedElements.addAll(state); - } - } - - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. - this.bufferedElements.addAll(state); - } -} -{% endhighlight %} - -As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards -compatibility reasons and more details will be explained at the end of this section. - -##### CheckpointedFunction - -The `CheckpointedFunction` interface requires again the implementation of two methods: - -{% highlight java %} -void snapshotState(FunctionSnapshotContext context) throws Exception; - -void initializeState(FunctionInitializationContext context) throws Exception; -{% endhighlight %} - -As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is -the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only -in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different -types of state are initialized, but also where state recovery logic is included. An implementation of the -`CheckpointedFunction` interface for `BufferingSink` is presented below. - -{% highlight java %} -public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, - CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; - - private transient ListState<Tuple2<String, Integer>> checkpointedState; - - private List<Tuple2<String, Integer>> bufferedElements; - - public BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } - - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - checkpointedState.clear(); - for (Tuple2<String, Integer> element : bufferedElements) { - checkpointedState.add(element); - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - checkpointedState = context.getOperatorStateStore(). - getSerializableListState("buffered-elements"); - - if (context.isRestored()) { - for (Tuple2<String, Integer> element : checkpointedState.get()) { - bufferedElements.add(element); - } - } - } - - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. - this.bufferedElements.addAll(state); - } -} -{% endhighlight %} - -The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize -the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects -are going to be stored upon checkpointing: - -`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");` - -After initializing the container, we use the `isRestored()` method of the context to check if we are -recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. - -As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state -initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared -of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint. - -As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done -using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case -for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example, -the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods -would look like this: - -{% highlight java %} -public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> - implements CheckpointedFunction { - - private transient ValueState<Integer> counter; - - private final int numberElements; - - public CountMapper(int numberElements) { - this.numberElements = numberElements; - } - - @Override - public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { - int count = counter.value() + 1; - counter.update(count); - - if (count % numberElements == 0) { - out.collect(Tuple2.of(value.f0, count)); - counter.update(0); // reset to 0 - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - // all managed, nothing to do. - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - counter = context.getKeyedStateStore().getState( - new ValueStateDescriptor<>("counter", Integer.class, 0)); - } -} -{% endhighlight %} - -Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state -upon checkpointing. - -#### Backwards compatibility with Flink 1.1 - -So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2. -The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already -running job from Flink 1.1 stopped?". - -The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing. -Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to -implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the -familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of -the `BufferingSink`, the `restoreState()` method is identical to its predecessor. - -### Aligned Processing Time Window Operators - -In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger, -the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be -either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of -these operators are referred to as *aligned* window operators as they assume their input elements arrive in -order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at -the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and -had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival. - -In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic -`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently -read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format -that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`. - -<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators -in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the -`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling -windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to -resume execution from a Flink 1.1 savepoint while using these operators. - -<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities -and **no backwards compatibility** with Flink 1.1. - -The code to use the aligned window operators in Flink 1.2 is presented below: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} - -// for tumbling windows -DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(0) - .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))) - .apply(your-function) - -// for sliding windows -DataStream<Tuple2<String, Integer>> window1 = source - .keyBy(0) - .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) - .apply(your-function) - -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} - -// for tumbling windows -val window1 = source - .keyBy(0) - .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))) - .apply(your-function) - -// for sliding windows -val window2 = source - .keyBy(0) - .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) - .apply(your-function) - -{% endhighlight %} -</div> -</div> - {% top %}