LadyForest commented on code in PR #22837:
URL: https://github.com/apache/flink/pull/22837#discussion_r1241175580


##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left states to keep 
left inputs and right states to keep right inputs. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
state. 
+Now you can set different state TTL for deduplicate state and aggregate state.
+
+**Generate a Compiled Plan**
+
+The setup process begins by generating a JSON file using the `COMPILE PLAN` 
statement, 
+which represents the serialized execution plan of the current table program. 
+{{< hint info >}}
+Currently, `COMPILE PLAN` statement does not support `SELECT... FROM...` 
queries. 
+{{< /hint >}}
+
+- Run a `COMPILE PLAN` statement
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL Syntax
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    This will generate a JSON file at `/path/to/plan.json`.
+
+{{< hint info >}}
+`COMPILE PLAN` statement supports writing the plan to a remote 
[filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like 
`hdfs://` or `s3://`. 
+Please be sure that the target path has set up the write access.
+{{< /hint >}}
+
+**Modify the Compiled Plan**
+
+Every operator that uses state will explicitly generate a JSON array named 
"state" with the following structure. 
+Theoretically, A k-th input stream operator will have k-th state.
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+Locate the operator for which you need to set TTL, modify the TTL to a 
positive integer (note that the time unit is milliseconds), 

Review Comment:
   Exactly. `0 ms` means disabling state retention (state never expires).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to