|
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3217#discussion_r98663201
— Diff: docs/ops/upgrading.md — @@ -25,13 +25,87 @@ under the License.
-
-
-
Upgrading Flink Streaming Applications
+Flink DataStream programs are typically designed to run for long periods of time such as weeks, months, or even years. As with all long-running services, Flink streaming applications need to be maintained, which includes fixing bugs, implementing improvements, or migrating an application to a Flink cluster of a later version.
-
- Savepoint, stop/cancel, start from savepoint
-
- Atomic Savepoint and Stop (link to JIRA issue)
+This document describes how to update a Flink streaming application and how migrate a running streaming application to a different Flink cluster.
-
- Limitations: Breaking chaining behavior (link to Savepoint section)
-
- Encourage using `uid(...)` explicitly for every operator
+## Restarting Streaming Applications + +The line of action for upgrading a streaming application or migrating an application to a different cluster is based on Flink's [Savepoint]({{ site.baseurl }}/setup/savepoints.html) feature. A savepoint is a consistent snapshot of the state of an application at a specific point in time. + +There are two ways of taking a savepoint from a running streaming application. + +* Taking a savepoint and continue processing. +``` +> ./bin/flink savepoint <jobID> [pathToSavepoint] +``` +It is recommended to periodically take savepoints in order to be able to restart an application from a previous point in time. + +* Taking a savepoint and stopping the application as a single action. +``` +> ./bin/flink cancel -s [pathToSavepoint] <jobID> +``` +This means that the application is canceled immediately after the savepoint completed, i.e., no other checkpoints are taken after the savepoint. + +Given a savepoint taken from an application, the same or a compatible application (see [Application State Compatibility](#application-state-compatibility) section below) can be started from that savepoint. Starting an application from a savepoint means that the state of its operators is initialized with the operator state persisted in the savepoint. This is done by starting an application using a savepoint. +``` +> ./bin/flink run -d -s [pathToSavepoint] ~/application.jar +``` + +The operators of the started application are initialized with the operator state of the original application (i.e., the application the savepoint was taken from) at the time when the savepoint was taken. The started application continues processing from exactly this point on. + +*Note*: Even though Flink consistently restores the state of an application, it cannot revert writes to external systems. This can be an issue if you resume from a savepoint that was taken without stopping the application. In this case, the application has probably emitted data after the savepoint was taken. The restarted application might (depending on whether you changed the application logic or not) emit the same data again. The exact effect of this behavior can be very different depending on the `SinkFunction` and storage system. Data that is emitted twice might be OK in case of idempotent writes to a key-value store like Cassandra but problematic in case of appends to a durable log such as Kafka. In any case, you should carefully check and test the behavior of a restarted application. + +## Application State Compatibility + +When upgrading an application in order to fix a bug or to improve the application, usually the goal is to replace the application logic of the running application while preserving its state. We do this by starting the upgraded application from a savepoint which was taken from the original application. However, this does only work if both applications are state compatible, meaning that the operators of upgraded application are able to initialize their state with the state of the operators of original application. + +In this section, we discuss how applications can be modified to remain state compatible. + +### Matching Operator State + +When an application is restarted from a savepoint, Flink matches the operator state stored in the savepoint to stateful operators of the started application. The matching is done based on operator IDs, which are also stored in the savepoint. Each operator has a default ID that is derived from the operator's position in the application's operator topology. Hence, an unmodified application can always be restarted from one of its own savepoints. However, the default IDs of operators are likely to change if an application is modified. Therefore, modified applications can only be started from a savepoint if the operator IDs have been explicitly specified. Assigning IDs to operators is very simple and done using the `uid(String)` method as follows: + +``` +val mappedEvents: DataStream[(Int, Long)] = events + .map(new MyStatefulMapFunc()).uid(“mapper-1”) +``` + +*Note:* Since the operator IDs stored in a savepoint and IDs of operators in the application to start must be equal, it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future. This advice applies to all operators, i.e., operators with and without explicitly declared operator state, because some operators have internal state that is not visible to the user. Upgrading an application without assigned operator IDs is significantly more difficult and may only be possible via a low-level workaround using the `setUidHash()` method. + +By default all state stored in a savepoint must be matched to the operators of a starting application. However, users can explicitly agree to skip (and thereby discard) state that cannot be matched to an operator when starting a application from a savepoint. Stateful operators for which no state is found in the savepoint are not initialized with their default state.
Typo (I think the __ not __ is incorrect)? "Stateful operators for which no state is found in the savepoint are __ not __ initialized with their default state."
|