LadyForest commented on code in PR #22837:
URL: https://github.com/apache/flink/pull/22837#discussion_r1241166276
##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}})
和 [SQL]({{< ref "
由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
管道会被现有优化规则集优化成尽可能少地使用状态。
+#### 状态算子
+
{{< hint info >}}
从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref
"docs/dev/table/concepts/dynamic_tables" >}}))。
它们的状态取决于用到的操作。
{{< /hint >}}
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink
抽象的容错存储内保存中间结果。
例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}})
以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}})
概念来让保持较小的状态规模。
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+ word STRING
+) WITH (
+ 'connector' = '...'
+);
+CREATE TABLE word_cnt (
+ word STRING PRIMARY KEY NOT ENFORCED,
+ cnt BIGINT
+) WITH (
+ 'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op"
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables"
>}}#table-to-stream-conversion))或配置(参考
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config"
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref
"docs/connectors/table/upsert-kafka" >}})。
```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+ id INT PRIMARY KEY NOT ENFORCED,
+ message STRING
+) WITH (
+ 'connector' = 'upsert-kafka',
+ ...
+);
+
+SELECT * FROM upsert_kakfa;
```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op"
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
Review Comment:
I agree that 60% looks much better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]