This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5339a8137fb368789808b9e47a9f97896e89b255
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Fri May 29 10:53:14 2020 +0200

    [FLINK-18032] Remove outdated sections in migration guide
---
 docs/dev/migration.md    | 456 +----------------------------------------------
 docs/dev/migration.zh.md | 456 +----------------------------------------------
 2 files changed, 8 insertions(+), 904 deletions(-)

diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index 288e2ef..09556c1 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -25,6 +25,10 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
+See the [older migration
+guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html)
+for information about migrating from older versions than Flink 1.3.
+
 ## Migrating from Flink 1.3+ to Flink 1.7
 
 ### API changes for serializer snapshots
@@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now 
deprecated, and will b
 in favor of the new `TypeSerializerSnapshot`. For details and guides on how to 
migrate, please see
 [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ 
site.baseurl 
}}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17).
 
-## Migrating from Flink 1.2 to Flink 1.3
-
-There are a few APIs that have been changed since Flink 1.2. Most of the 
changes are documented in their
-specific documentations. The following is a consolidated list of API changes 
and links to details for migration when
-upgrading to Flink 1.3.
-
-### `TypeSerializer` interface changes
-
-This would be relevant mostly for users implementing custom `TypeSerializer`s 
for their state.
-
-Since Flink 1.3, two additional methods have been added that are related to 
serializer compatibility
-across savepoint restores. Please see
-[Handling serializer upgrades and compatibility]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility)
-for further details on how to implement these methods.
-
-### `ProcessFunction` is always a `RichFunction`
-
-In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was 
introduced.
-Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is 
now always a `RichFunction` with access to
-the lifecycle methods and runtime context.
-
-### Flink CEP library API changes
-
-The CEP library in Flink 1.3 ships with a number of new features which have 
led to some changes in the API.
-Please visit the [CEP Migration docs]({{ site.baseurl 
}}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details.
-
-### Logger dependencies removed from Flink core artifacts
-
-In Flink 1.3, to make sure that users can use their own custom logging 
framework, core Flink artifacts are
-now clean of specific logger dependencies.
-
-Example and quickstart archetypes already have loggers specified and should 
not be affected.
-For other custom projects, make sure to add logger dependencies. For example, 
in Maven's `pom.xml`, you can add:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.slf4j</groupId>
-    <artifactId>slf4j-log4j12</artifactId>
-    <version>1.7.7</version>
-</dependency>
-
-<dependency>
-    <groupId>log4j</groupId>
-    <artifactId>log4j</artifactId>
-    <version>1.2.17</version>
-</dependency>
-{% endhighlight %}
-
-## Migrating from Flink 1.1 to Flink 1.2
-
-As mentioned in the [State documentation]({{ site.baseurl 
}}/dev/stream/state/state.html), Flink has two types of state:
-**keyed** and **non-keyed** state (also called **operator** state). Both types 
are available to
-both operators and user-defined functions. This document will guide you 
through the process of migrating your Flink 1.1
-function code to Flink 1.2 and will present some important internal changes 
introduced in Flink 1.2 that concern the
-deprecation of the aligned window operators from Flink 1.1 (see [Aligned 
Processing Time Window Operators](#aligned-processing-time-window-operators)).
-
-The migration process will serve two goals:
-
-1. allow your functions to take advantage of the new features introduced in 
Flink 1.2, such as rescaling,
-
-2. make sure that your new Flink 1.2 job will be able to resume execution from 
a savepoint generated by its
-Flink 1.1 predecessor.
-
-After following the steps in this guide, you will be able to migrate your 
running job from Flink 1.1 to Flink 1.2
-simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) 
with your Flink 1.1 job and giving it to
-your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to 
resume execution from where its
-Flink 1.1 predecessor left off.
-
-### Example User Functions
-
-As running examples for the remainder of this document we will use the 
`CountMapper` and the `BufferingSink`
-functions. The first is an example of a function with **keyed** state, while
-the second has **non-keyed** state. The code for the aforementioned two 
functions in Flink 1.1 is presented below:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>> {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        counter = getRuntimeContext().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-}
-
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private ArrayList<Tuple2<String, Integer>> bufferedElements;
-
-    BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public ArrayList<Tuple2<String, Integer>> snapshotState(
-        long checkpointId, long checkpointTimestamp) throws Exception {
-        return bufferedElements;
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
-        bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-
-The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key 
input stream of the form
-`(word, 1)`. The function keeps a counter for each incoming key 
(`ValueState<Integer> counter`) and if
-the number of occurrences of a certain word surpasses the user-provided 
threshold, a tuple is emitted
-containing the word itself and the number of occurrences.
-
-The `BufferingSink` is a `SinkFunction` that receives elements (potentially 
the output of the `CountMapper`)
-and buffers them until a certain user-specified threshold is reached, before 
emitting them to the final sink.
-This is a common way to avoid many expensive calls to a database or an 
external storage system. To do the
-buffering in a fault-tolerant manner, the buffered elements are kept in a list 
(`bufferedElements`) which is
-periodically checkpointed.
-
-### State API Migration
-
-To leverage the new features of Flink 1.2, the code above should be modified 
to use the new state abstractions.
-After doing these changes, you will be able to change the parallelism of your 
job (scale up or down) and you
-are guaranteed that the new version of your job will start from where its 
predecessor left off.
-
-**Keyed State:** Something to note before delving into the details of the 
migration process is that if your function
-has **only keyed state**, then the exact same code from Flink 1.1 also works 
for Flink 1.2 with full support
-for the new features and full backwards compatibility. Changes could be made 
just for better code organization,
-but this is just a matter of style.
-
-With the above said, the rest of this section focuses on the **non-keyed 
state**.
-
-#### Rescaling and new state abstractions
-
-The first modification is the transition from the old `Checkpointed<T extends 
Serializable>` state interface
-to the new ones. In Flink 1.2, a stateful function can implement either the 
more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface, which 
is semantically closer to the old
-`Checkpointed` one.
-
-In both cases, the non-keyed state is expected to be a `List` of 
*serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects 
are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1 the 
checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the 
parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-More details on the principles behind rescaling of both keyed state and 
non-keyed state can be found in
-the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html).
-
-##### ListCheckpointed
-
-The `ListCheckpointed` interface requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-Their semantics are the same as their counterparts in the old `Checkpointed` 
interface. The only difference
-is that now `snapshotState()` should return a list of objects to checkpoint, 
as stated earlier, and
-`restoreState` has to handle this list upon recovery. If the state is not 
re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The 
updated code for `BufferingSink`
-is included below:
-
-{% highlight java %}
-public class BufferingSinkListCheckpointed implements
-        SinkFunction<Tuple2<String, Integer>>,
-        ListCheckpointed<Tuple2<String, Integer>>,
-        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSinkListCheckpointed(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        this.bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public List<Tuple2<String, Integer>> snapshotState(
-            long checkpointId, long timestamp) throws Exception {
-        return this.bufferedElements;
-    }
-
-    @Override
-    public void restoreState(List<Tuple2<String, Integer>> state) throws 
Exception {
-        if (!state.isEmpty()) {
-            this.bufferedElements.addAll(state);
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-As shown in the code, the updated function also implements the 
`CheckpointedRestoring` interface. This is for backwards
-compatibility reasons and more details will be explained at the end of this 
section.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface requires again the implementation of two 
methods:
-
-{% highlight java %}
-void snapshotState(FunctionSnapshotContext context) throws Exception;
-
-void initializeState(FunctionInitializationContext context) throws Exception;
-{% endhighlight %}
-
-As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is 
performed, but now `initializeState()` (which is
-the counterpart of the `restoreState()`) is called every time the user-defined 
function is initialized, rather than only
-in the case that we are recovering from a failure. Given this, 
`initializeState()` is not only the place where different
-types of state are initialized, but also where state recovery logic is 
included. An implementation of the
-`CheckpointedFunction` interface for `BufferingSink` is presented below.
-
-{% highlight java %}
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, 
Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-        checkpointedState.clear();
-        for (Tuple2<String, Integer> element : bufferedElements) {
-            checkpointedState.add(element);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        checkpointedState = context.getOperatorStateStore().
-            getSerializableListState("buffered-elements");
-
-        if (context.isRestored()) {
-            for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                bufferedElements.add(element);
-            }
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-The `initializeState` takes as argument a `FunctionInitializationContext`. 
This is used to initialize
-the non-keyed state "container". This is a container of type `ListState` where 
the non-keyed state objects
-are going to be stored upon checkpointing:
-
-`this.checkpointedState = 
context.getOperatorStateStore().getSerializableListState("buffered-elements");`
-
-After initializing the container, we use the `isRestored()` method of the 
context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the 
restore logic is applied.
-
-As shown in the code of the modified `BufferingSink`, this `ListState` 
recovered during state
-initialization is kept in a class variable for future use in 
`snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with 
the new ones we want to checkpoint.
-
-As a side note, the keyed state can also be initialized in the 
`initializeState()` method. This can be done
-using the `FunctionInitializationContext` given as argument, instead of the 
`RuntimeContext`, which is the case
-for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the 
`CountMapper` example,
-the old `open()` method could be removed and the new `snapshotState()` and 
`initializeState()` methods
-would look like this:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
-        implements CheckpointedFunction {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-        // all managed, nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        counter = context.getKeyedStateStore().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-}
-{% endhighlight %}
-
-Notice that the `snapshotState()` method is empty as Flink itself takes care 
of snapshotting managed keyed state
-upon checkpointing.
-
-#### Backwards compatibility with Flink 1.1
-
-So far we have seen how to modify our functions to take advantage of the new 
features introduced by Flink 1.2.
-The question that remains is "Can I make sure that my modified (Flink 1.2) job 
will start from where my already
-running job from Flink 1.1 stopped?".
-
-The answer is yes, and the way to do it is pretty straightforward. For the 
keyed state, you have to do nothing.
-Flink will take care of restoring the state from Flink 1.1. For the non-keyed 
state, your new function has to
-implement the `CheckpointedRestoring` interface, as shown in the code above. 
This has a single method, the
-familiar `restoreState()` from the old `Checkpointed` interface from Flink 
1.1. As shown in the modified code of
-the `BufferingSink`, the `restoreState()` method is identical to its 
predecessor.
-
-### Aligned Processing Time Window Operators
-
-In Flink 1.1, and only when operating on *processing time* with no specified 
evictor or trigger,
-the command `timeWindow()` on a keyed stream would instantiate a special type 
of `WindowOperator`. This could be
-either an `AggregatingProcessingTimeWindowOperator` or an 
`AccumulatingProcessingTimeWindowOperator`. Both of
-these operators are referred to as *aligned* window operators as they assume 
their input elements arrive in
-order. This is valid when operating in processing time, as elements get as 
timestamp the wall-clock time at
-the moment they arrive at the window operator. These operators were restricted 
to using the memory state backend, and
-had optimized data structures for storing the per-window elements which 
leveraged the in-order input element arrival.
-
-In Flink 1.2, the aligned window operators are deprecated, and all windowing 
operations go through the generic
-`WindowOperator`. This migration requires no change in the code of your Flink 
1.1 job, as Flink will transparently
-read the state stored by the aligned window operators in your Flink 1.1 
savepoint, translate it into a format
-that is compatible with the generic `WindowOperator`, and resume execution 
using the generic `WindowOperator`.
-
-<span class="label label-info">Note</span> Although deprecated, you can still 
use the aligned window operators
-in Flink 1.2 through special `WindowAssigners` introduced for exactly this 
purpose. These assigners are the
-`SlidingAlignedProcessingTimeWindows` and the 
`TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
-windows respectively. A Flink 1.2 job that uses aligned windowing has to be a 
new job, as there is no way to
-resume execution from a Flink 1.1 savepoint while using these operators.
-
-<span class="label label-danger">Attention</span> The aligned window operators 
provide **no rescaling** capabilities
-and **no backwards compatibility** with Flink 1.1.
-
-The code to use the aligned window operators in Flink 1.2 is presented below:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// for tumbling windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, 
TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// for tumbling windows
-val window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, 
TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-val window2 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-</div>
-
 {% top %}
diff --git a/docs/dev/migration.zh.md b/docs/dev/migration.zh.md
index fb1c559..76b4e8c 100644
--- a/docs/dev/migration.zh.md
+++ b/docs/dev/migration.zh.md
@@ -25,6 +25,10 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
+See the [older migration
+guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html)
+for information about migrating from older versions than Flink 1.3.
+
 ## Migrating from Flink 1.3+ to Flink 1.7
 
 ### API changes for serializer snapshots
@@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now 
deprecated, and will b
 in favor of the new `TypeSerializerSnapshot`. For details and guides on how to 
migrate, please see
 [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ 
site.baseurl 
}}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17).
 
-## Migrating from Flink 1.2 to Flink 1.3
-
-There are a few APIs that have been changed since Flink 1.2. Most of the 
changes are documented in their
-specific documentations. The following is a consolidated list of API changes 
and links to details for migration when
-upgrading to Flink 1.3.
-
-### `TypeSerializer` interface changes
-
-This would be relevant mostly for users implementing custom `TypeSerializer`s 
for their state.
-
-Since Flink 1.3, two additional methods have been added that are related to 
serializer compatibility
-across savepoint restores. Please see
-[Handling serializer upgrades and compatibility]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility)
-for further details on how to implement these methods.
-
-### `ProcessFunction` is always a `RichFunction`
-
-In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was 
introduced.
-Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is 
now always a `RichFunction` with access to
-the lifecycle methods and runtime context.
-
-### Flink CEP library API changes
-
-The CEP library in Flink 1.3 ships with a number of new features which have 
led to some changes in the API.
-Please visit the [CEP Migration docs]({{ site.baseurl 
}}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details.
-
-### Logger dependencies removed from Flink core artifacts
-
-In Flink 1.3, to make sure that users can use their own custom logging 
framework, core Flink artifacts are
-now clean of specific logger dependencies.
-
-Example and quickstart archetypes already have loggers specified and should 
not be affected.
-For other custom projects, make sure to add logger dependencies. For example, 
in Maven's `pom.xml`, you can add:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.slf4j</groupId>
-    <artifactId>slf4j-log4j12</artifactId>
-    <version>1.7.7</version>
-</dependency>
-
-<dependency>
-    <groupId>log4j</groupId>
-    <artifactId>log4j</artifactId>
-    <version>1.2.17</version>
-</dependency>
-{% endhighlight %}
-
-## Migrating from Flink 1.1 to Flink 1.2
-
-As mentioned in the [State documentation]({{ site.baseurl 
}}/dev/stream/state/state.html), Flink has two types of state:
-**keyed** and **non-keyed** state (also called **operator** state). Both types 
are available to
-both operators and user-defined functions. This document will guide you 
through the process of migrating your Flink 1.1
-function code to Flink 1.2 and will present some important internal changes 
introduced in Flink 1.2 that concern the
-deprecation of the aligned window operators from Flink 1.1 (see [Aligned 
Processing Time Window Operators](#aligned-processing-time-window-operators)).
-
-The migration process will serve two goals:
-
-1. allow your functions to take advantage of the new features introduced in 
Flink 1.2, such as rescaling,
-
-2. make sure that your new Flink 1.2 job will be able to resume execution from 
a savepoint generated by its
-Flink 1.1 predecessor.
-
-After following the steps in this guide, you will be able to migrate your 
running job from Flink 1.1 to Flink 1.2
-simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) 
with your Flink 1.1 job and giving it to
-your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to 
resume execution from where its
-Flink 1.1 predecessor left off.
-
-### Example User Functions
-
-As running examples for the remainder of this document we will use the 
`CountMapper` and the `BufferingSink`
-functions. The first is an example of a function with **keyed** state, while
-the second has **non-keyed** state. The code for the aforementioned two 
functions in Flink 1.1 is presented below:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>> {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        counter = getRuntimeContext().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-}
-
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private ArrayList<Tuple2<String, Integer>> bufferedElements;
-
-    BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public ArrayList<Tuple2<String, Integer>> snapshotState(
-        long checkpointId, long checkpointTimestamp) throws Exception {
-        return bufferedElements;
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
-        bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-
-The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key 
input stream of the form
-`(word, 1)`. The function keeps a counter for each incoming key 
(`ValueState<Integer> counter`) and if
-the number of occurrences of a certain word surpasses the user-provided 
threshold, a tuple is emitted
-containing the word itself and the number of occurrences.
-
-The `BufferingSink` is a `SinkFunction` that receives elements (potentially 
the output of the `CountMapper`)
-and buffers them until a certain user-specified threshold is reached, before 
emitting them to the final sink.
-This is a common way to avoid many expensive calls to a database or an 
external storage system. To do the
-buffering in a fault-tolerant manner, the buffered elements are kept in a list 
(`bufferedElements`) which is
-periodically checkpointed.
-
-### State API Migration
-
-To leverage the new features of Flink 1.2, the code above should be modified 
to use the new state abstractions.
-After doing these changes, you will be able to change the parallelism of your 
job (scale up or down) and you
-are guaranteed that the new version of your job will start from where its 
predecessor left off.
-
-**Keyed State:** Something to note before delving into the details of the 
migration process is that if your function
-has **only keyed state**, then the exact same code from Flink 1.1 also works 
for Flink 1.2 with full support
-for the new features and full backwards compatibility. Changes could be made 
just for better code organization,
-but this is just a matter of style.
-
-With the above said, the rest of this section focuses on the **non-keyed 
state**.
-
-#### Rescaling and new state abstractions
-
-The first modification is the transition from the old `Checkpointed<T extends 
Serializable>` state interface
-to the new ones. In Flink 1.2, a stateful function can implement either the 
more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface, which 
is semantically closer to the old
-`Checkpointed` one.
-
-In both cases, the non-keyed state is expected to be a `List` of 
*serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects 
are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1 the 
checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the 
parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-More details on the principles behind rescaling of both keyed state and 
non-keyed state can be found in
-the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html).
-
-##### ListCheckpointed
-
-The `ListCheckpointed` interface requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-Their semantics are the same as their counterparts in the old `Checkpointed` 
interface. The only difference
-is that now `snapshotState()` should return a list of objects to checkpoint, 
as stated earlier, and
-`restoreState` has to handle this list upon recovery. If the state is not 
re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The 
updated code for `BufferingSink`
-is included below:
-
-{% highlight java %}
-public class BufferingSinkListCheckpointed implements
-        SinkFunction<Tuple2<String, Integer>>,
-        ListCheckpointed<Tuple2<String, Integer>>,
-        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSinkListCheckpointed(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        this.bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public List<Tuple2<String, Integer>> snapshotState(
-            long checkpointId, long timestamp) throws Exception {
-        return this.bufferedElements;
-    }
-
-    @Override
-    public void restoreState(List<Tuple2<String, Integer>> state) throws 
Exception {
-        if (!state.isEmpty()) {
-            this.bufferedElements.addAll(state);
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-As shown in the code, the updated function also implements the 
`CheckpointedRestoring` interface. This is for backwards
-compatibility reasons and more details will be explained at the end of this 
section.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface requires again the implementation of two 
methods:
-
-{% highlight java %}
-void snapshotState(FunctionSnapshotContext context) throws Exception;
-
-void initializeState(FunctionInitializationContext context) throws Exception;
-{% endhighlight %}
-
-As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is 
performed, but now `initializeState()` (which is
-the counterpart of the `restoreState()`) is called every time the user-defined 
function is initialized, rather than only
-in the case that we are recovering from a failure. Given this, 
`initializeState()` is not only the place where different
-types of state are initialized, but also where state recovery logic is 
included. An implementation of the
-`CheckpointedFunction` interface for `BufferingSink` is presented below.
-
-{% highlight java %}
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, 
Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-        checkpointedState.clear();
-        for (Tuple2<String, Integer> element : bufferedElements) {
-            checkpointedState.add(element);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        checkpointedState = context.getOperatorStateStore().
-            getSerializableListState("buffered-elements");
-
-        if (context.isRestored()) {
-            for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                bufferedElements.add(element);
-            }
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-The `initializeState` takes as argument a `FunctionInitializationContext`. 
This is used to initialize
-the non-keyed state "container". This is a container of type `ListState` where 
the non-keyed state objects
-are going to be stored upon checkpointing:
-
-`this.checkpointedState = 
context.getOperatorStateStore().getSerializableListState("buffered-elements");`
-
-After initializing the container, we use the `isRestored()` method of the 
context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the 
restore logic is applied.
-
-As shown in the code of the modified `BufferingSink`, this `ListState` 
recovered during state
-initialization is kept in a class variable for future use in 
`snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with 
the new ones we want to checkpoint.
-
-As a side note, the keyed state can also be initialized in the 
`initializeState()` method. This can be done
-using the `FunctionInitializationContext` given as argument, instead of the 
`RuntimeContext`, which is the case
-for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the 
`CountMapper` example,
-the old `open()` method could be removed and the new `snapshotState()` and 
`initializeState()` methods
-would look like this:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
-        implements CheckpointedFunction {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-        // all managed, nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        counter = context.getKeyedStateStore().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-}
-{% endhighlight %}
-
-Notice that the `snapshotState()` method is empty as Flink itself takes care 
of snapshotting managed keyed state
-upon checkpointing.
-
-#### Backwards compatibility with Flink 1.1
-
-So far we have seen how to modify our functions to take advantage of the new 
features introduced by Flink 1.2.
-The question that remains is "Can I make sure that my modified (Flink 1.2) job 
will start from where my already
-running job from Flink 1.1 stopped?".
-
-The answer is yes, and the way to do it is pretty straightforward. For the 
keyed state, you have to do nothing.
-Flink will take care of restoring the state from Flink 1.1. For the non-keyed 
state, your new function has to
-implement the `CheckpointedRestoring` interface, as shown in the code above. 
This has a single method, the
-familiar `restoreState()` from the old `Checkpointed` interface from Flink 
1.1. As shown in the modified code of
-the `BufferingSink`, the `restoreState()` method is identical to its 
predecessor.
-
-### Aligned Processing Time Window Operators
-
-In Flink 1.1, and only when operating on *processing time* with no specified 
evictor or trigger,
-the command `timeWindow()` on a keyed stream would instantiate a special type 
of `WindowOperator`. This could be
-either an `AggregatingProcessingTimeWindowOperator` or an 
`AccumulatingProcessingTimeWindowOperator`. Both of
-these operators are referred to as *aligned* window operators as they assume 
their input elements arrive in
-order. This is valid when operating in processing time, as elements get as 
timestamp the wall-clock time at
-the moment they arrive at the window operator. These operators were restricted 
to using the memory state backend, and
-had optimized data structures for storing the per-window elements which 
leveraged the in-order input element arrival.
-
-In Flink 1.2, the aligned window operators are deprecated, and all windowing 
operations go through the generic
-`WindowOperator`. This migration requires no change in the code of your Flink 
1.1 job, as Flink will transparently
-read the state stored by the aligned window operators in your Flink 1.1 
savepoint, translate it into a format
-that is compatible with the generic `WindowOperator`, and resume execution 
using the generic `WindowOperator`.
-
-<span class="label label-info">Note</span> Although deprecated, you can still 
use the aligned window operators
-in Flink 1.2 through special `WindowAssigners` introduced for exactly this 
purpose. These assigners are the
-`SlidingAlignedProcessingTimeWindows` and the 
`TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
-windows respectively. A Flink 1.2 job that uses aligned windowing has to be a 
new job, as there is no way to
-resume execution from a Flink 1.1 savepoint while using these operators.
-
-<span class="label label-danger">Attention</span> The aligned window operators 
provide **no rescaling** capabilities
-and **no backwards compatibility** with Flink 1.1.
-
-The code to use the aligned window operators in Flink 1.2 is presented below:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// for tumbling windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, 
TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// for tumbling windows
-val window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, 
TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-val window2 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-</div>
-
 {% top %}

Reply via email to