This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a1200ff6ad7 [FLINK-34059][docs][table] Add documentation on STATE_TTL
hint
a1200ff6ad7 is described below
commit a1200ff6ad78146076a5d3b304abd383f1677c29
Author: Jane Chan <[email protected]>
AuthorDate: Tue Jan 30 09:31:56 2024 +0800
[FLINK-34059][docs][table] Add documentation on STATE_TTL hint
This closes #24198
---
.../content.zh/docs/dev/table/concepts/overview.md | 33 +++++++
.../content.zh/docs/dev/table/sql/queries/hints.md | 93 ++++++++++++++++++-
docs/content/docs/dev/table/concepts/overview.md | 38 +++++++-
docs/content/docs/dev/table/sql/queries/hints.md | 101 ++++++++++++++++++++-
4 files changed, 258 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/concepts/overview.md
b/docs/content.zh/docs/dev/table/concepts/overview.md
index 0da9496dca3..013ec33558a 100644
--- a/docs/content.zh/docs/dev/table/concepts/overview.md
+++ b/docs/content.zh/docs/dev/table/concepts/overview.md
@@ -124,6 +124,39 @@ SELECT * FROM upsert_kakfa;
通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是对应键的第一条记录。上述例子中意味着
`cnt` 会再次从 `0` 开始计数。
+#### 指定状态生命周期的不同方式
+<table class="table table-bordered">
+<thead>
+<tr>
+ <th class="text-left">配置方式</th>
+ <th class="text-left">TableAPI/SQL 支持</th>
+ <th class="text-left">生效范围</th>
+ <th class="text-left">优先级</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+ <td>SET 'table.exec.state.ttl' = '...' </td>
+ <td>{{<label TableAPI>}}{{<label SQL>}}</td>
+ <td>作业粒度,默认情况下所有状态算子都会使用该值控制状态生命周期</td>
+ <td>默认配置,可被覆盖</td>
+</tr>
+<tr>
+ <td>SELECT /*+ STATE_TTL(...) */ ... </td>
+ <td>{{<label SQL>}}</td>
+ <td>有限算子粒度,当前支持连接和分组聚合算子</td>
+ <td>该值将会优先作用于相应算子的状态生命周期。查阅<a href="{{< ref
"docs/dev/table/sql/queries/hints" >}}#状态生命周期提示">状态生命周期提示</a>获取更多信息。</td>
+</tr>
+<tr>
+ <td>修改序列化为 JSON 的 CompiledPlan </td>
+ <td>{{<label TableAPI>}}{{<label SQL>}}</td>
+ <td>通用算子粒度, 可修改任一状态算子的生命周期</td>
+ <td>table.exec.state.ttl 和 STATE_TTL 的值将会序列化到 CompiledPlan,如果作业使用
CompiledPlan 提交,则最终生效的生命周期由最后一次修改的状态元数据决定。</td>
+</tr>
+</tbody>
+</table>
+
+
#### 配置算子粒度的状态 TTL
--------------------------
{{< hint warning >}}
diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md
b/docs/content.zh/docs/dev/table/sql/queries/hints.md
index 7cfec52207b..8bfaa1050c1 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/hints.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md
@@ -511,13 +511,13 @@ ON o.customer_id = c.id AND
DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') =
- 异步查找中,如果 'output-mode' 最终为 'ORDERED',那延迟重试造成反压的概率相对 'UNORDERED' 更高,这种情况下调大
'capacity' 不一定能有效减轻反压,可能需要考虑减小延迟等待的时长。
{{< /hint >}}
-### 联接提示使用中的冲突
+#### 联接提示使用中的冲突
当联接提示产生冲突时,Flink 会选择最匹配的执行方式。
- 同一种联接提示间产生冲突时,Flink 会为联接选择第一个最匹配的表。
- 不同联接提示间产生冲突时,Flink 会为联接选择第一个最匹配的联接提示。
-#### 示例
+##### 示例
```sql
CREATE TABLE t1 (id BIGINT, name STRING, age INT) WITH (...);
@@ -549,6 +549,95 @@ SELECT /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * FROM t1
FULL OUTER JOIN t2 ON t1.
-- 由于指定的两种联接提示都不支持不等值的联接条件。所以,只能使用支持非等值联接条件的 nested loop join。
SELECT /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * FROM t1 FULL OUTER JOIN t2 ON
t1.id > t2.id;
```
+### 状态生命周期提示
+
+{{< label Streaming >}}
+
+对于有状态计算的[流连接]({{< ref "docs/dev/table/sql/queries/joins"
>}}#regular-joins)和[分组聚合]({{< ref "docs/dev/table/sql/queries/group-agg"
>}})操作,用户可以通过 `STATE_TTL` 来指定算子粒度的[空闲状态维持时间]({{< ref
"docs/dev/table/concepts/overview"
>}}#idle-state-retention-time),该方式能够使得在上述状态算子中使用与作业级别
[table.exec.state.ttl]({{< ref "docs/dev/table/config"
>}}#table-exec-state-ttl) 不同的值。
+
+##### 流连接示例
+
+```sql
+CREATE TABLE orders (
+ o_orderkey INT,
+ o_custkey INT,
+ o_status BOOLEAN,
+ o_totalprice DOUBLE
+) WITH (...);
+
+CREATE TABLE lineitem (
+ l_linenumber int,
+ l_orderkey int,
+ l_partkey int,
+ l_extendedprice double
+) WITH (...);
+
+CREATE TABLE customers (
+ c_custkey int,
+ c_address string
+) WITH (...);
+
+-- 表名作为 hint 键
+SELECT /*+ STATE_TTL('orders'='3d', 'lineitem'='1d') */ * FROM
+orders LEFT JOIN lineitem
+ON orders.o_orderkey = lineitem.l_orderkey;
+
+
+-- 别名作为 hint 键
+SELECT /*+ STATE_TTL('o'='3d', 'l'='1d') */ * FROM
+orders o LEFT JOIN lineitem l
+ON o.o_orderkey = l.l_orderkey;
+
+-- 临时视图作为 hint 键
+CREATE TEMPORARY VIEW left_input AS SELECT ... FROM orders WHERE ...;
+CREATE TEMPORARY VIEW right_input AS SELECT ... FROM lineitem WHERE ...;
+SELECT /*+ STATE_TTL('left_input'= '360000s', 'right_input' = '15h') */ *
+FROM left_input JOIN right_input
+ON left_input.join_key = right_input.join_key;
+
+-- 级联 join
+SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */ *
+FROM orders o LEFT OUTER JOIN lineitem l
+ON o.o_orderkey = l.l_orderkey
+LEFT OUTER JOIN customers c
+ON o.o_custkey = c.c_custkey;
+```
+
+##### 分组聚合示例
+
+```sql
+-- 表名作为 hint 键
+SELECT /*+ STATE_TTL('orders' = '1d') */ o_orderkey, SUM(o_totalprice) AS
revenue
+FROM orders
+GROUP BY o_orderkey;
+
+-- 别名作为 hint 键
+SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
+FROM orders AS o
+GROUP BY o_orderkey;
+
+-- 查询块作为 hint 键
+SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
+FROM (SELECT o_orderkey, o_totalprice
+ FROM orders
+ WHERE o_shippriority = 0) tmp
+GROUP BY o_orderkey;
+```
+
+{{< hint info >}}
+注意:
+
+- 用户既可以选择表(或视图)名也可以选择别名作为提示键,但在指定别名时需要使用别名。
+-
对于多流连接场景,直接指定每张表的生命周期只会在第一个连接算子的左右流和第二个连接算子的右流上生效(因为流上关联操作是二元的)。如果想为每个连接算子的左右流都指定不同生命周期,需要将查询拆成多个查询块,如下所示。
+ ```sql
+ CREATE TEMPORARY VIEW V AS
+ SELECT /*+ STATE_TTL('A' = '1d', 'B' = '12h')*/ * FROM A JOIN B ON...;
+ SELECT /*+ STATE_TTL('V' = '1d', 'C' = '3d')*/ * FROM V JOIN C ON ...;
+ ```
+- `STATE_TTL` 提示仅作用在当前查询块上。
+- 当 `STATE_TTL` 提示键重复时取最后一个值。举例来说,在出现 `SELECT /*+ STATE_TTL('A' = '1d', 'A' =
'2d')*/ * FROM ...` 时,输入 A 的 TTL 值将会取 2d。
+- 当出现多个 `STATE_TTL` 且提示键重复时取第一个值。举例来说,在出现 `SELECT /*+ STATE_TTL('A' = '1d',
'B' = '2d'), STATE_TTL('C' = '12h', 'A' = '6h')*/ * FROM ...` 时,输入 A 的 TTL 值将会取
1d。
+{{< /hint >}}
### 什么是查询块?
diff --git a/docs/content/docs/dev/table/concepts/overview.md
b/docs/content/docs/dev/table/concepts/overview.md
index 4deee64d56b..a7b927aed75 100644
--- a/docs/content/docs/dev/table/concepts/overview.md
+++ b/docs/content/docs/dev/table/concepts/overview.md
@@ -134,6 +134,39 @@ before. If a record with a key, whose state has been
removed before, is processe
be treated as if it was the first record with the respective key. For the
example above this means
that the count of a `word` would start again at `0`.
+#### Different Ways to Configure State TTL
+<table class="table table-bordered">
+<thead>
+<tr>
+ <th class="text-left">Configuration</th>
+ <th class="text-left">TableAPI/SQL Support</th>
+ <th class="text-left">Granularity</th>
+ <th class="text-left">Priority</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+ <td>SET 'table.exec.state.ttl' = '...' </td>
+ <td>{{<label TableAPI>}}{{<label SQL>}}</td>
+ <td>Pipeline level, all stateful operators will use the value by
default.</td>
+ <td>This is the default state TTL configuration and can be overridden
by enabling the STATE_TTL hint or modifying the value of the serialized
CompiledPlan.</td>
+</tr>
+<tr>
+ <td>SELECT /*+ STATE_TTL(...) */ ... </td>
+ <td>{{<label SQL>}}</td>
+ <td>Operator level with only regular join and group aggregation
support.</td>
+ <td>The hint precedes the default table.exec.state.ttl. This value will
be serialized to the CompiledPlan during the plan translation phase. See more
at <a href="{{< ref "docs/dev/table/sql/queries/hints"
>}}#state-ttl-hints">State TTL Hint</a>. </td>
+</tr>
+<tr>
+ <td>Modify serialized JSON content of CompiledPlan </td>
+ <td>{{<label TableAPI>}}{{<label SQL>}}</td>
+ <td>Operator level with a generalized support. The TTL for each
stateful operator is explicitly serialized as an entry of the JSON. Modifying
the JSON file can change the TTL for any stateful operator.</td>
+ <td>The TTL in CompiledPlan derives from either table.exec.state.ttl or
STATE_TTL hint. If the job is submitted via CompiledPlan,
+the ultimate TTL value is decided by the last modified state metadata.</td>
+</tr>
+</tbody>
+</table>
+
#### Configure Operator-level State TTL
--------------------------
{{< hint warning >}}
@@ -145,15 +178,14 @@ If the pipeline only uses one state, you only need to set
[`table.exec.state.ttl
at pipeline level.
{{< /hint >}}
-From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL
at operator-level to improve the state usage.
+Table API & SQL supports configuring fine-grained state TTL at operator-level
to improve the state usage.
The configurable granularity is defined as the number of incoming input edges
for each state operator.
Specifically, `OneInputStreamOperator` can configure the TTL for one state,
while `TwoInputStreamOperator` (such as regular join), which has two inputs,
can configure the TTL for the left and right states separately.
More generally, for `MultipleInputStreamOperator` which has K inputs, K state
TTLs can be configured.
Typical use cases are as follows:
- Set different TTLs for [regular joins]({{< ref
"docs/dev/table/sql/queries/joins" >}}#regular-joins).
-Regular join generates a `TwoInputStreamOperator` with left state to keep left
input and right state to keep right input. From Flink v1.18,
-you can set the different state TTL for left state and right state.
+Regular join generates a `TwoInputStreamOperator` with left state to keep left
input and right state to keep right input. You can set the different state TTL
for left state and right state.
- Set different TTLs for different transformations within one pipeline.
For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
and then use `GROUP BY` to perform [aggregation]({{< ref
"docs/dev/table/sql/queries/group-agg" >}}).
diff --git a/docs/content/docs/dev/table/sql/queries/hints.md
b/docs/content/docs/dev/table/sql/queries/hints.md
index 33eebcfb75a..786b15b6dcf 100644
--- a/docs/content/docs/dev/table/sql/queries/hints.md
+++ b/docs/content/docs/dev/table/sql/queries/hints.md
@@ -556,13 +556,13 @@ retry maybe higher than 'UNORDERED' mode, in which case
increasing async 'capaci
in reducing backpressure, and it may be necessary to consider reducing the
delay duration.
{{< /hint >}}
-### Conflict Cases In Join Hints
+#### Conflict Cases In Join Hints
If the `Join Hints` conflicts occur, Flink will choose the most matching one.
- Conflict in one same Join Hint strategy, Flink will choose the first
matching table for a join.
- Conflict in different Join Hints strategies, Flink will choose the first
matching hint for a join.
-#### Examples
+##### Examples
```sql
CREATE TABLE t1 (id BIGINT, name STRING, age INT) WITH (...);
@@ -597,6 +597,103 @@ SELECT /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * FROM t1
FULL OUTER JOIN t2 ON t1.
SELECT /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * FROM t1 FULL OUTER JOIN t2 ON
t1.id > t2.id;
```
+### State TTL Hints
+
+{{< label Streaming >}}
+
+For stateful computation [Regular Join]({{< ref
"docs/dev/table/sql/queries/joins" >}}#regular-joins)
+and [Group Aggregation]({{< ref "docs/dev/table/sql/queries/group-agg" >}}),
users can
+use `STATE_TTL` hint to
+specify operator-level [Idle State Retention Time]({{< ref
"docs/dev/table/concepts/overview" >}}#idle-state-retention-time),
+which enables the aforementioned operators to have a different TTL against the
pipeline level configuration [table.exec.state.ttl]({{< ref
"docs/dev/table/config" >}}#table-exec-state-ttl).
+
+##### Regular Join Examples
+
+```sql
+CREATE TABLE orders (
+ o_orderkey INT,
+ o_custkey INT,
+ o_status BOOLEAN,
+ o_totalprice DOUBLE
+) WITH (...);
+
+CREATE TABLE lineitem (
+ l_linenumber int,
+ l_orderkey int,
+ l_partkey int,
+ l_extendedprice double
+) WITH (...);
+
+CREATE TABLE customers (
+ c_custkey int,
+ c_address string
+) WITH (...);
+
+-- table name as hint key
+SELECT /*+ STATE_TTL('orders'='3d', 'lineitem'='1d') */ * FROM
+orders LEFT JOIN lineitem
+ON orders.o_orderkey = lineitem.l_orderkey;
+
+
+-- table alias as hint key
+SELECT /*+ STATE_TTL('o'='3d', 'l'='1d') */ * FROM
+orders o LEFT JOIN lineitem l
+ON o.o_orderkey = l.l_orderkey;
+
+-- temporary view name as hint key
+CREATE TEMPORARY VIEW left_input AS SELECT ... FROM orders WHERE ...;
+CREATE TEMPORARY VIEW right_input AS SELECT ... FROM lineitem WHERE ...;
+SELECT /*+ STATE_TTL('left_input'= '360000s', 'right_input' = '15h') */ *
+FROM left_input JOIN right_input
+ON left_input.join_key = right_input.join_key;
+
+-- cascade joins
+SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */ *
+FROM orders o LEFT OUTER JOIN lineitem l
+ON o.o_orderkey = l.l_orderkey
+LEFT OUTER JOIN customers c
+ON o.o_custkey = c.c_custkey;
+```
+
+##### Group Aggregation Examples
+
+```sql
+-- table name as hint key
+SELECT /*+ STATE_TTL('orders' = '1d') */ o_orderkey, SUM(o_totalprice) AS
revenue
+FROM orders
+GROUP BY o_orderkey;
+
+-- table alias as hint key
+SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
+FROM orders AS o
+GROUP BY o_orderkey;
+
+-- query block alias as hint key
+SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
+FROM (SELECT o_orderkey, o_totalprice
+ FROM orders
+ WHERE o_shippriority = 0) tmp
+GROUP BY o_orderkey;
+```
+
+{{< hint info >}}
+Note:
+
+- Users can choose either table/view name or table alias as the hint key.
However, once the alias is specified, the `STATE_TTL` must be hinted on the
alias.
+- For cascade joins, the specified state TTLs will be interpreted as the left
and right state TTL for the first join operator and
+ the right state TTL for the second join operator (from a bottom-up order).
+ The left state TTL for the second join operator will be retrieved from the
configuration `table.exec.state.ttl`.
+ If users need to set a specific TTL value for the left state of the second
join operator, the query needs to be split into query blocks like
+ ```sql
+ CREATE TEMPORARY VIEW V AS
+ SELECT /*+ STATE_TTL('A' = '1d', 'B' = '12h')*/ * FROM A JOIN B ON...;
+ SELECT /*+ STATE_TTL('V' = '1d', 'C' = '3d')*/ * FROM V JOIN C ON ...;
+ ```
+- STATE_TTL hint only applies on the underlying query block.
+- When the `STATE_TTL` hint key is duplicated, the value is applied from the
last occurrence. For example, in cases like `SELECT /*+ STATE_TTL('A' = '1d',
'A' = '2d')*/ * FROM ...`, the TTL for input A will be taken as 2d.
+- When there are multiple `STATE_TTL` hints appear with duplicated hint key,
the value is applied from the first occurrence. For example, in cases like
`SELECT /*+ STATE_TTL('A' = '1d', 'B' = '2d'), STATE_TTL('C' = '12h', 'A' =
'6h')*/ * FROM ...`, the TTL for input A will be taken as 1d.
+{{< /hint >}}
+
### What are query blocks ?
A `query block` is a basic unit of SQL. For example, any inline view or
sub-query of a SQL statement are considered separate