luoyuxia commented on code in PR #22837:
URL: https://github.com/apache/flink/pull/22837#discussion_r1241048219


##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}

Review Comment:
   I have no strong perference to put the hint info under `State Usage` or 
`Stateful Operators`? But I'm wondering whether it'll be better to put it under 
`State Usage` since it's not strong  related  to `Stateful Operator`.



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not

Review Comment:
   ```suggestion
   For the previous example query, the count of a `word` would be removed as 
soon as it has not
   ```
   ?



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 

Review Comment:
   What `the number of used states can be defined as the configuration 
granularity` means? 



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left states to keep 
left inputs and right states to keep right inputs. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
state. 

Review Comment:
   nit
   ```suggestion
   This table program will generate two `OneInputStreamOperator`s with their 
own states. 
   ```
   ?



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
 
 通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+对应键的第一条记录。上述例子中意味着 `cnt` 会再次从 `0` 开始计数。

Review Comment:
   Don't break the line in here, otherwise it'll be "被认为是 对应键的", there will be 
an extra blank.



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
 
 通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+对应键的第一条记录。上述例子中意味着 `cnt` 会再次从 `0` 开始计数。
+
+#### 配置算子粒度的状态 TTL
+--------------------------
+{{< hint warning >}}
+这是一个需要小心使用的高级特性。该特性仅适用于作业中使用了多个状态,并且每个状态需要使用不同的 TTL。
+无状态的作业不需要参考下面的操作步骤。
+如果作业中仅使用到一个状态,仅需设置作业级别的 TTL 参数 [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)。
+
+{{< /hint >}}
+
+从 Flink v1.18 开始 Table API & SQL 支持配置细粒度的状态 TTL 来优化状态使用。具体来说, 
支持为每个状态算子用到的状态配置单独的 TTL。
+
+一些典型的使用场景如下
+- 为 [双流 Join]({{< ref "docs/dev/table/sql/queries/joins" >}}#regular-joins) 
的左右流配置不同 TTL。 
+双流 Join 会生成拥有两条输入边的 `TwoInputStreamOperator` 的状态算子,它用到了两个状态,分别来保存来自左流和右流的更新。
+- 在同一个作业中为不同的状态计算设置不同 TTL。
+举例来说,假设一个 ETL 作业使用 `ROW_NUMBER` 进行[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}})操作后,
+紧接着使用 `GROUP BY` 语句进行[聚合]({{< ref "docs/dev/table/sql/queries/group-agg" 
>}})操作。
+该作业会分别生成两个拥有单条输入边的 `OneInputStreamOperator` 状态算子。您可以为去重算子和聚合算子的状态分别设置不同的 TTL。
+
+**生成 Compiled Plan**
+
+配置过程首先会使用 `COMPILE PLAN` 语句生成一个 JSON 文件,它表示了序列化后的执行计划。
+{{< hint info >}}
+`COMPILE PLAN` 不支持查询语句 `SELECT... FROM...` 。 
+{{< /hint >}}
+
+- 执行 `COMPILE PLAN` 语句
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL 语法
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    该语句会在指定位置 `/path/to/plan.json` 生成一个 JSON 文件。
+
+{{< hint info >}}
+`COMPILE PLAN` 语句支持写入 `hdfs://` 或 `s3://` 等 Flink 支持的[文件系统]({{< ref 
"docs/deployment/filesystems/overview" >}})。
+请确保已为目标写入路径设置了写入权限。
+{{< /hint >}}
+
+**修改 Compiled Plan**
+
+每个状态算子会显式地生成一个名为 "state" 的 JSON 数组,具有如下结构。
+理论上一个拥有 k 路输入的状态算子拥有 k 个状态。
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+找到您需要修改的状态算子,将 TTL 设置为一个正整数(注意时间单位是毫秒),保存好文件,然后使用 `EXECUTE PLAN` 语句来提交作业。
+
+
+{{< hint info >}}
+理论上,下游状态算子的 TTL 不应小于上游状态算子的 TTL。
+{{< /hint >}}
+
+**运行 Compiled Plan**
+
+`EXECUTE PLAN` 语句将会反序列化上述 JSON 文件,进一步生成 JobGraph 并提交作业。
+通过 `EXECUTE PLAN` 语句提交的作业,其状态算子的 TTL 的值将会从文件中读取,配置项 `table.exec.state.ttl` 
的值将会被忽略。
+
+- 运行 `EXECUTE PLAN` 语句
+{{< tabs "execute-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await();
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await()
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 79fbe3fa497e4689165dd81b1d225ea8
+```
 
+{{< /tab >}}
+{{< /tabs >}}
+
+- SQL 语法
+
+    ```sql
+    EXECUTE PLAN [IF EXISTS] <plan_file>;
+    ```
+    该语句反序列化指定的 JSON 文件,并提交作业。
+
+**完整示例**
+
+下面的例子展示了通过双流 Join 计算订单明细的作业如何为左右流设置不同的 TTL。

Review Comment:
   ```suggestion
   下面的例子展示了一个通过双流 Join 计算订单明细的作业,并且如何为左右流设置不同的 TTL。
   ```



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left states to keep 
left inputs and right states to keep right inputs. From Flink v1.18,

Review Comment:
   ```suggestion
   Regular join generates a `TwoInputStreamOperator` with left state to keep 
left input and right state to keep right input. From Flink v1.18,
   ```
   ?
   Since we use `left state` and `right state` in the following sentence. Maybe 
we need to keep it consistent.



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
 
 通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+对应键的第一条记录。上述例子中意味着 `cnt` 会再次从 `0` 开始计数。
+
+#### 配置算子粒度的状态 TTL
+--------------------------
+{{< hint warning >}}
+这是一个需要小心使用的高级特性。该特性仅适用于作业中使用了多个状态,并且每个状态需要使用不同的 TTL。
+无状态的作业不需要参考下面的操作步骤。
+如果作业中仅使用到一个状态,仅需设置作业级别的 TTL 参数 [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)。
+
+{{< /hint >}}
+
+从 Flink v1.18 开始 Table API & SQL 支持配置细粒度的状态 TTL 来优化状态使用。具体来说, 
支持为每个状态算子用到的状态配置单独的 TTL。
+
+一些典型的使用场景如下
+- 为 [双流 Join]({{< ref "docs/dev/table/sql/queries/joins" >}}#regular-joins) 
的左右流配置不同 TTL。 
+双流 Join 会生成拥有两条输入边的 `TwoInputStreamOperator` 的状态算子,它用到了两个状态,分别来保存来自左流和右流的更新。
+- 在同一个作业中为不同的状态计算设置不同 TTL。
+举例来说,假设一个 ETL 作业使用 `ROW_NUMBER` 进行[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}})操作后,
+紧接着使用 `GROUP BY` 语句进行[聚合]({{< ref "docs/dev/table/sql/queries/group-agg" 
>}})操作。
+该作业会分别生成两个拥有单条输入边的 `OneInputStreamOperator` 状态算子。您可以为去重算子和聚合算子的状态分别设置不同的 TTL。
+
+**生成 Compiled Plan**
+
+配置过程首先会使用 `COMPILE PLAN` 语句生成一个 JSON 文件,它表示了序列化后的执行计划。
+{{< hint info >}}
+`COMPILE PLAN` 不支持查询语句 `SELECT... FROM...` 。 
+{{< /hint >}}
+
+- 执行 `COMPILE PLAN` 语句
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL 语法
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    该语句会在指定位置 `/path/to/plan.json` 生成一个 JSON 文件。
+
+{{< hint info >}}
+`COMPILE PLAN` 语句支持写入 `hdfs://` 或 `s3://` 等 Flink 支持的[文件系统]({{< ref 
"docs/deployment/filesystems/overview" >}})。
+请确保已为目标写入路径设置了写入权限。
+{{< /hint >}}
+
+**修改 Compiled Plan**
+
+每个状态算子会显式地生成一个名为 "state" 的 JSON 数组,具有如下结构。
+理论上一个拥有 k 路输入的状态算子拥有 k 个状态。
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+找到您需要修改的状态算子,将 TTL 设置为一个正整数(注意时间单位是毫秒),保存好文件,然后使用 `EXECUTE PLAN` 语句来提交作业。
+
+
+{{< hint info >}}
+理论上,下游状态算子的 TTL 不应小于上游状态算子的 TTL。
+{{< /hint >}}
+
+**运行 Compiled Plan**
+
+`EXECUTE PLAN` 语句将会反序列化上述 JSON 文件,进一步生成 JobGraph 并提交作业。
+通过 `EXECUTE PLAN` 语句提交的作业,其状态算子的 TTL 的值将会从文件中读取,配置项 `table.exec.state.ttl` 
的值将会被忽略。
+
+- 运行 `EXECUTE PLAN` 语句
+{{< tabs "execute-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await();
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await()
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 79fbe3fa497e4689165dd81b1d225ea8
+```
 
+{{< /tab >}}
+{{< /tabs >}}
+
+- SQL 语法
+
+    ```sql
+    EXECUTE PLAN [IF EXISTS] <plan_file>;

Review Comment:
   nit
   ```suggestion
       EXECUTE PLAN [IF EXISTS] <plan_file_path>;
   ```



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left states to keep 
left inputs and right states to keep right inputs. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
state. 
+Now you can set different state TTL for deduplicate state and aggregate state.
+
+**Generate a Compiled Plan**
+
+The setup process begins by generating a JSON file using the `COMPILE PLAN` 
statement, 
+which represents the serialized execution plan of the current table program. 
+{{< hint info >}}
+Currently, `COMPILE PLAN` statement does not support `SELECT... FROM...` 
queries. 
+{{< /hint >}}
+
+- Run a `COMPILE PLAN` statement
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL Syntax
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    This will generate a JSON file at `/path/to/plan.json`.
+
+{{< hint info >}}
+`COMPILE PLAN` statement supports writing the plan to a remote 
[filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like 
`hdfs://` or `s3://`. 
+Please be sure that the target path has set up the write access.
+{{< /hint >}}
+
+**Modify the Compiled Plan**
+
+Every operator that uses state will explicitly generate a JSON array named 
"state" with the following structure. 
+Theoretically, A k-th input stream operator will have k-th state.
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+Locate the operator for which you need to set TTL, modify the TTL to a 
positive integer (note that the time unit is milliseconds), 
+save the file, and then use the `EXECUTE PLAN` statement to submit your job.
+
+
+{{< hint info >}}
+Conceptually, the TTL of downstream stateful operator should be greater than 
or equal to the TTL of upstream stateful operator.
+{{< /hint >}}
+
+**Execute the Compiled Plan**
+
+`EXECUTE PLAN` statement will deserialize the specified file back to execution 
plan of the current table program and then submit the job.
+The job submitted via `EXECUTE PLAN` statement will apply state TTL read from 
the file, instead of the configuration `table.exec.state.ttl`.
+
+- Run an `EXECUTE PLAN` statement
+{{< tabs "execute-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await();
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await()
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 79fbe3fa497e4689165dd81b1d225ea8
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+- SQL Syntax
+
+    ```sql
+    EXECUTE PLAN [IF EXISTS] <plan_file>;

Review Comment:
   nit:
   ```suggestion
       EXECUTE PLAN [IF EXISTS] <plan_file_path>;
   ```
   ?



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left states to keep 
left inputs and right states to keep right inputs. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
state. 
+Now you can set different state TTL for deduplicate state and aggregate state.
+
+**Generate a Compiled Plan**
+
+The setup process begins by generating a JSON file using the `COMPILE PLAN` 
statement, 
+which represents the serialized execution plan of the current table program. 
+{{< hint info >}}
+Currently, `COMPILE PLAN` statement does not support `SELECT... FROM...` 
queries. 
+{{< /hint >}}
+
+- Run a `COMPILE PLAN` statement
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL Syntax
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    This will generate a JSON file at `/path/to/plan.json`.
+
+{{< hint info >}}
+`COMPILE PLAN` statement supports writing the plan to a remote 
[filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like 
`hdfs://` or `s3://`. 
+Please be sure that the target path has set up the write access.
+{{< /hint >}}
+
+**Modify the Compiled Plan**
+
+Every operator that uses state will explicitly generate a JSON array named 
"state" with the following structure. 
+Theoretically, A k-th input stream operator will have k-th state.
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+Locate the operator for which you need to set TTL, modify the TTL to a 
positive integer (note that the time unit is milliseconds), 

Review Comment:
   Just double check, to a positive integer or something like `xx ms`?



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。

Review Comment:
   ```suggestion
   [表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)),状态算子可能会被隐式地推导出来。
   ```



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。

Review Comment:
   ```suggestion
   形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询语句通常是无状态的管道。
   ```



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}

Review Comment:
   I like these two figures.
   But just a very personal taste, would it be better to make it smaller for it 
almost fills up my whole page.



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left states to keep 
left inputs and right states to keep right inputs. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
state. 
+Now you can set different state TTL for deduplicate state and aggregate state.
+
+**Generate a Compiled Plan**
+
+The setup process begins by generating a JSON file using the `COMPILE PLAN` 
statement, 
+which represents the serialized execution plan of the current table program. 
+{{< hint info >}}
+Currently, `COMPILE PLAN` statement does not support `SELECT... FROM...` 
queries. 
+{{< /hint >}}
+
+- Run a `COMPILE PLAN` statement
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL Syntax
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;

Review Comment:
   nit:
   ```suggestion
       COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR 
<insert_statement>|<statement_set>;
   ```
   ?



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
 
 通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+对应键的第一条记录。上述例子中意味着 `cnt` 会再次从 `0` 开始计数。
+
+#### 配置算子粒度的状态 TTL
+--------------------------
+{{< hint warning >}}
+这是一个需要小心使用的高级特性。该特性仅适用于作业中使用了多个状态,并且每个状态需要使用不同的 TTL。
+无状态的作业不需要参考下面的操作步骤。
+如果作业中仅使用到一个状态,仅需设置作业级别的 TTL 参数 [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)。
+
+{{< /hint >}}
+
+从 Flink v1.18 开始 Table API & SQL 支持配置细粒度的状态 TTL 来优化状态使用。具体来说, 
支持为每个状态算子用到的状态配置单独的 TTL。

Review Comment:
   nit
   ```suggestion
   从 Flink v1.18 开始, Table API & SQL 支持配置细粒度的状态 TTL 来优化状态使用。具体来说, 
支持为每个状态算子用到的状态配置单独的 TTL。
   ```
   ?



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
 
 通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+对应键的第一条记录。上述例子中意味着 `cnt` 会再次从 `0` 开始计数。
+
+#### 配置算子粒度的状态 TTL
+--------------------------
+{{< hint warning >}}
+这是一个需要小心使用的高级特性。该特性仅适用于作业中使用了多个状态,并且每个状态需要使用不同的 TTL。
+无状态的作业不需要参考下面的操作步骤。
+如果作业中仅使用到一个状态,仅需设置作业级别的 TTL 参数 [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)。
+
+{{< /hint >}}
+
+从 Flink v1.18 开始 Table API & SQL 支持配置细粒度的状态 TTL 来优化状态使用。具体来说, 
支持为每个状态算子用到的状态配置单独的 TTL。
+
+一些典型的使用场景如下
+- 为 [双流 Join]({{< ref "docs/dev/table/sql/queries/joins" >}}#regular-joins) 
的左右流配置不同 TTL。 
+双流 Join 会生成拥有两条输入边的 `TwoInputStreamOperator` 的状态算子,它用到了两个状态,分别来保存来自左流和右流的更新。
+- 在同一个作业中为不同的状态计算设置不同 TTL。
+举例来说,假设一个 ETL 作业使用 `ROW_NUMBER` 进行[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}})操作后,
+紧接着使用 `GROUP BY` 语句进行[聚合]({{< ref "docs/dev/table/sql/queries/group-agg" 
>}})操作。
+该作业会分别生成两个拥有单条输入边的 `OneInputStreamOperator` 状态算子。您可以为去重算子和聚合算子的状态分别设置不同的 TTL。
+
+**生成 Compiled Plan**
+
+配置过程首先会使用 `COMPILE PLAN` 语句生成一个 JSON 文件,它表示了序列化后的执行计划。
+{{< hint info >}}
+`COMPILE PLAN` 不支持查询语句 `SELECT... FROM...` 。 
+{{< /hint >}}
+
+- 执行 `COMPILE PLAN` 语句
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL 语法
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    该语句会在指定位置 `/path/to/plan.json` 生成一个 JSON 文件。
+
+{{< hint info >}}
+`COMPILE PLAN` 语句支持写入 `hdfs://` 或 `s3://` 等 Flink 支持的[文件系统]({{< ref 
"docs/deployment/filesystems/overview" >}})。
+请确保已为目标写入路径设置了写入权限。
+{{< /hint >}}
+
+**修改 Compiled Plan**
+
+每个状态算子会显式地生成一个名为 "state" 的 JSON 数组,具有如下结构。
+理论上一个拥有 k 路输入的状态算子拥有 k 个状态。
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+找到您需要修改的状态算子,将 TTL 设置为一个正整数(注意时间单位是毫秒),保存好文件,然后使用 `EXECUTE PLAN` 语句来提交作业。
+
+
+{{< hint info >}}
+理论上,下游状态算子的 TTL 不应小于上游状态算子的 TTL。
+{{< /hint >}}
+
+**运行 Compiled Plan**

Review Comment:
   ```suggestion
   **执行 Compiled Plan**
   ```
   ?



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}

Review Comment:
   Maybe  `width="60%"` or others.



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -51,42 +51,453 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果,
 管道会被现有优化规则集优化成尽可能少地使用状态。
 
+#### 状态算子
+
 {{< hint info >}}
 从概念上讲, 源表从来不会在状态中被完全保存。 实现者处理的是逻辑表(即[动态表]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}))。
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
-
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})
+或[去重]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 
抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化并且由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,
+因此总状态量会随着 `word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate))状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
 
 通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+对应键的第一条记录。上述例子中意味着 `cnt` 会再次从 `0` 开始计数。
+
+#### 配置算子粒度的状态 TTL
+--------------------------
+{{< hint warning >}}
+这是一个需要小心使用的高级特性。该特性仅适用于作业中使用了多个状态,并且每个状态需要使用不同的 TTL。
+无状态的作业不需要参考下面的操作步骤。
+如果作业中仅使用到一个状态,仅需设置作业级别的 TTL 参数 [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)。
+
+{{< /hint >}}
+
+从 Flink v1.18 开始 Table API & SQL 支持配置细粒度的状态 TTL 来优化状态使用。具体来说, 
支持为每个状态算子用到的状态配置单独的 TTL。
+
+一些典型的使用场景如下
+- 为 [双流 Join]({{< ref "docs/dev/table/sql/queries/joins" >}}#regular-joins) 
的左右流配置不同 TTL。 
+双流 Join 会生成拥有两条输入边的 `TwoInputStreamOperator` 的状态算子,它用到了两个状态,分别来保存来自左流和右流的更新。
+- 在同一个作业中为不同的状态计算设置不同 TTL。
+举例来说,假设一个 ETL 作业使用 `ROW_NUMBER` 进行[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}})操作后,
+紧接着使用 `GROUP BY` 语句进行[聚合]({{< ref "docs/dev/table/sql/queries/group-agg" 
>}})操作。
+该作业会分别生成两个拥有单条输入边的 `OneInputStreamOperator` 状态算子。您可以为去重算子和聚合算子的状态分别设置不同的 TTL。
+
+**生成 Compiled Plan**
+
+配置过程首先会使用 `COMPILE PLAN` 语句生成一个 JSON 文件,它表示了序列化后的执行计划。
+{{< hint info >}}
+`COMPILE PLAN` 不支持查询语句 `SELECT... FROM...` 。 
+{{< /hint >}}
+
+- 执行 `COMPILE PLAN` 语句
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL 语法
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;

Review Comment:
   nit:
   ```suggestion
       COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR 
<insert_statement>|<statement_set>;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to