This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f17e33fd81644155bee60faef94587f1922a7d6
Author: Timo Walther <[email protected]>
AuthorDate: Mon Sep 13 14:07:53 2021 +0200

    [FLINK-21589][docs] Document table pipeline upgrades
    
    This closes #17260
---
 docs/content/docs/dev/table/concepts/overview.md | 76 ++++++++++++++++++++++++
 docs/content/docs/ops/upgrading.md               | 21 ++++++-
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git a/docs/content/docs/dev/table/concepts/overview.md 
b/docs/content/docs/dev/table/concepts/overview.md
index bf7d441..acfb873 100644
--- a/docs/content/docs/dev/table/concepts/overview.md
+++ b/docs/content/docs/dev/table/concepts/overview.md
@@ -32,6 +32,82 @@ This means that Table API and SQL queries have the same 
semantics regardless whe
 
 The following pages explain concepts, practical limitations, and 
stream-specific configuration parameters of Flink's relational APIs on 
streaming data.
 
+State Management
+----------------
+
+Table programs that run in streaming mode leverage all capabilities of Flink 
as a stateful stream
+processor.
+
+In particular, a table program can be configured with a [state backend]({{< 
ref "docs/ops/state/state_backends" >}})
+and various [checkpointing options]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}})
+for handling different requirements regarding state size and fault tolerance. 
It is possible to take
+a savepoint of a running Table API & SQL pipeline and to restore the 
application's state at a later
+point in time.
+
+### State Usage
+
+Due to the declarative nature of Table API & SQL programs, it is not always 
obvious where and how much
+state is used within a pipeline. The planner decides whether state is 
necessary to compute a correct
+result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
+rules.
+
+{{< hint info >}}
+Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
+(i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
+depend on the used operations.
+{{< /hint >}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
+intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
+
+For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
+entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
+point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
+that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
+
+### Stateful Upgrades and Evolution
+
+Table programs that are executed in streaming mode are intended as *standing 
queries* which means they
+are defined once and are continuously evaluated as static end-to-end pipelines.
+
+In case of stateful pipelines, any change to both the query or Flink's planner 
might lead to a completely
+different execution plan. This makes stateful upgrades and the evolution of 
table programs challenging
+at the moment. The community is working on improving those shortcomings.
+
+For example, by adding a filter predicate, the optimizer might decide to 
reorder joins or change the
+schema of an intermediate operator. This prevents restoring from a savepoint 
due to either changed
+topology or different column layout within the state of an operator.
+
+The query implementer must ensure that the optimized plans before and after 
the change are compatible.
+Use the `EXPLAIN` command in SQL or `table.explain()` in Table API to [get 
insights]({{< ref "docs/dev/table/common" >}}#explaining-a-table).
+
+Since new optimizer rules are continuously added, and operators become more 
efficient and specialized,
+also the upgrade to a newer Flink version could lead to incompatible plans.
+
+{{< hint warning >}}
+Currently, the framework cannot guarantee that state can be mapped from a 
savepoint to a new table
+operator topology.
+
+In other words: Savepoints are only supported if both the query and the Flink 
version remain constant.
+{{< /hint >}}
+
+Since the community rejects contributions that modify the optimized plan and 
the operator topology
+in a patch version (e.g. from `1.13.1` to `1.13.2`), it should be safe to 
upgrade a Table API & SQL
+pipeline to a newer bug fix release. However, major-minor upgrades from (e.g. 
from `1.12` to `1.13`)
+are not supported.
+
+For both shortcomings (i.e. modified query and modified Flink version), we 
recommend to investigate
+whether the state of an updated table program can be "warmed up" (i.e. 
initialized) with historical
+data again before switching to real-time data. The Flink community is working 
on a [hybrid source]({{< ref "docs/connectors/datastream/hybridsource" >}})
+to make this switching as convenient as possible.
+
+
 Where to go next?
 -----------------
 
diff --git a/docs/content/docs/ops/upgrading.md 
b/docs/content/docs/ops/upgrading.md
index ad2e997..8b1480d 100644
--- a/docs/content/docs/ops/upgrading.md
+++ b/docs/content/docs/ops/upgrading.md
@@ -63,7 +63,9 @@ When upgrading an application in order to fix a bug or to 
improve the applicatio
 
 In this section, we discuss how applications can be modified to remain state 
compatible.
 
-### Matching Operator State
+### DataStream API
+
+#### Matching Operator State
 
 When an application is restarted from a savepoint, Flink matches the operator 
state stored in the savepoint to stateful operators of the started application. 
The matching is done based on operator IDs, which are also stored in the 
savepoint. Each operator has a default ID that is derived from the operator's 
position in the application's operator topology. Hence, an unmodified 
application can always be restarted from one of its own savepoints. However, 
the default IDs of operators are lik [...]
 
@@ -78,7 +80,7 @@ val mappedEvents: DataStream[(Int, Long)] = events
 
 By default all state stored in a savepoint must be matched to the operators of 
a starting application. However, users can explicitly agree to skip (and 
thereby discard) state that cannot be matched to an operator when starting a 
application from a savepoint. Stateful operators for which no state is found in 
the savepoint are initialized with their default state. Users may enforce best 
practices by calling `ExecutionConfig#disableAutoGeneratedUIDs` which will fail 
the job submission if an [...]
 
-### Stateful Operators and User Functions
+#### Stateful Operators and User Functions
 
 When upgrading an application, user functions and operators can be freely 
modified with one restriction. It is not possible to change the data type of 
the state of an operator. This is important because, state from a savepoint can 
(currently) not be converted into a different data type before it is loaded 
into an operator. Hence, changing the data type of operator state when 
upgrading an application breaks application state consistency and prevents the 
upgraded application from being res [...]
 
@@ -97,7 +99,7 @@ Operator state can be either user-defined or internal.
 | CoGroupFunction[IT1, IT2, OT]                       | IT1, IT2 (Type of 1. 
and 2. input), KEY |
 | Built-in Aggregations (sum, min, max, minBy, maxBy) | Input Type [, KEY]     
              |
 
-### Application Topology
+#### Application Topology
 
 Besides changing the logic of one or more existing operators, applications can 
be upgraded by changing the topology of the application, i.e., by adding or 
removing operators, changing the parallelism of an operator, or modifying the 
operator chaining behavior.
 
@@ -109,6 +111,19 @@ When upgrading an application by changing its topology, a 
few things need to be
 * **Changing of input and output types of operators:** When adding a new 
operator before or behind an operator with internal state, you have to ensure 
that the input or output type of the stateful operator is not modified to 
preserve the data type of the internal operator state (see above for details).
 * **Changing operator chaining:** Operators can be chained together for 
improved performance. When restoring from a savepoint taken since 1.3.x it is 
possible to modify chains while preserving state consistency. It is possible a 
break the chain such that a stateful operator is moved out of the chain. It is 
also possible to append or inject a new or existing stateful operator into a 
chain, or to modify the operator order within a chain. However, when upgrading 
a savepoint to 1.3.x it is p [...]
 
+### Table API & SQL
+
+Due to the declarative nature of Table API & SQL programs, the underlying 
operator topology and state
+representation are mostly determined and optimized by the table planner.
+
+Be aware that any change to both the query and the Flink version could lead to 
state incompatibility.
+Every new major-minor Flink version (e.g. `1.12` to `1.13`) might introduce 
new optimizer rules or more
+specialized runtime operators that change the execution plan. However, the 
community tries to keep patch
+versions state-compatible (e.g. `1.13.1` to `1.13.2`).
+
+See the [table state management section]({{< ref 
"docs/dev/table/concepts/overview" >}}#state-management)
+for more information.
+
 ## Upgrading the Flink Framework Version
 
 This section describes the general way of upgrading Flink across versions and 
migrating your jobs between the versions.

Reply via email to